From a8541c15b92a99fb06625bd4dde7f881e6d5745c Mon Sep 17 00:00:00 2001 From: Ruibiao Chen Date: Wed, 31 Aug 2022 11:04:20 +0800 Subject: [PATCH] Refactor ExecutorCache (#45532) * Refactor ExecutorCache * Update code * Fix mkldnn UT errors * Fix typos * Fix CI errors --- python/paddle/fluid/executor.py | 378 ++++++++++-------- .../fluid/tests/unittests/CMakeLists.txt | 5 + 2 files changed, 220 insertions(+), 163 deletions(-) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 5f96d7f06c..1db88cf7f3 100755 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -37,6 +37,8 @@ from . import framework from .incubate.checkpoint import auto_checkpoint as acp from .compiler import _prune_feed_ops +from functools import lru_cache + __all__ = ['Executor', 'global_scope', 'scope_guard'] g_scope = core.Scope() @@ -335,6 +337,84 @@ def has_fetch_operators(block, return fetch_count > 0 +def _add_feed_fetch_ops(program, + feed, + fetch_list, + feed_var_name, + fetch_var_name, + use_fetch_v2=False): + tmp_program = program.clone() + + global_block = tmp_program.global_block() + + if feed_var_name in global_block.vars: + feed_var = global_block.var(feed_var_name) + else: + feed_var = global_block.create_var( + name=feed_var_name, + type=core.VarDesc.VarType.FEED_MINIBATCH, + persistable=True) + + if fetch_var_name in global_block.vars: + fetch_var = global_block.var(fetch_var_name) + else: + fetch_var = global_block.create_var( + name=fetch_var_name, + type=core.VarDesc.VarType.FETCH_LIST, + persistable=True) + + # prepend feed operators + if not has_feed_operators(global_block, feed, feed_var_name): + for i, name in enumerate(feed): + if global_block.has_var(name): + out = global_block.var(name) + global_block._prepend_op(type='feed', + inputs={'X': [feed_var]}, + outputs={'Out': [out]}, + attrs={'col': i}) + else: + warnings.warn( + "The variable %s is not found in program. It is not declared or is pruned." + % name) + + if use_fetch_v2: + fetch_op = 'fetch_v2' + else: + fetch_op = 'fetch' + + # append fetch_operators + if not has_fetch_operators(global_block, fetch_list, fetch_var_name, + fetch_op): + for i, var in enumerate(fetch_list): + assert isinstance(var, Variable) or isinstance( + var, six.string_types), ("Wrong type for fetch_list[%s]: %s" % + (i, type(var))) + global_block.append_op(type=fetch_op, + inputs={'X': [var]}, + outputs={'Out': [fetch_var]}, + attrs={'col': i}) + + return tmp_program + + +def _apply_inplace_addto_pass(program, enable_inplace, enable_addto, + skip_var_names): + use_cuda = True if core.is_compiled_with_cuda() else False + + attrs = {"use_cuda": use_cuda, "mem_opt_skip_vars": skip_var_names} + attr_types = {"use_cuda": "bool", "mem_opt_skip_vars": "list[str]"} + + empty_startup_program = Program() + if enable_inplace: + pass_name = "buffer_shared_inplace_pass" + _apply_pass(program, empty_startup_program, pass_name, attrs, + attr_types) + if enable_addto and use_cuda: + pass_name = "inplace_addto_op_pass" + _apply_pass(program, empty_startup_program, pass_name, attrs, + attr_types) + + def _fetch_var(name, scope=None, return_numpy=True): """ Fetch the value of the variable with the given name from the @@ -613,10 +693,114 @@ class _StandaloneExecutor(object): class _ExecutorCache(object): - def __init__(self, place): - # {Program : _StandaloneExecutor} - self._place = place - self._cached_executors = {} + class _CachedData(object): + + def __init__(self, program, feed, fetch_list, feed_var_name, + fetch_var_name, place, scope): + self.program = program + self.feed = feed + self.fetch_list = fetch_list + self.feed_var_name = feed_var_name + self.fetch_var_name = fetch_var_name + self.place = place + self.scope = scope + + # NOTE(Ruibiao): Not all changeable item is considered for key at present, + # ONLY: program, feed, and fetch_list + if isinstance(self.program, compiler.CompiledProgram): + self.key = hash( + _get_strong_program_cache_key_for_new_exe( + self.program._program, feed, fetch_list)) + else: + self.key = hash( + _get_strong_program_cache_key_for_new_exe( + self.program, feed, fetch_list)) + + def __eq__(self, other): + return isinstance( + other, _ExecutorCache._CachedData) and self.key == other.key + + def __hash__(self): + return self.key + + def __init__(self): + # NOTE(Ruibiao): Wrap the lru_cache in constructor so that the cache is local to + # the _ExecutorCache instance, otherwise a global cache may not be released after + # the Executor instance deleted + self._get_cached_program_and_executor = lru_cache(maxsize=8)( + self._get_program_and_executor) + + def clear(self): + self._get_cached_program_and_executor.cache_clear() + + def get_program_and_executor(self, program, feed, fetch_list, feed_var_name, + fetch_var_name, place, scope): + return self._get_cached_program_and_executor( + self._CachedData(program, feed, fetch_list, feed_var_name, + fetch_var_name, place, scope)) + + def _get_program_and_executor(self, cached_data): + program = cached_data.program + inner_program = program._program if isinstance( + program, compiler.CompiledProgram) else program + feed = cached_data.feed + fetch_list = cached_data.fetch_list + feed_var_name = cached_data.feed_var_name + fetch_var_name = cached_data.fetch_var_name + place = cached_data.place + scope = cached_data.scope + + # To apply IR pass, compile the Program to IrGraph and convert it back to Program + if isinstance(program, compiler.CompiledProgram) or isinstance( + program._graph, compiler.CompiledProgram): + compiled_program = program if isinstance( + program, compiler.CompiledProgram) else program._graph + build_strategy = compiled_program._build_strategy + # print(f"Program before convert:\n {inner_program}", flush=True) + compiled_program._compile(scope, place) + ir_graph = framework.IrGraph(compiled_program._graph) + converted_program = ir_graph.to_program() + + if hasattr(inner_program, 'lr_sheduler'): + converted_program.lr_sheduler = inner_program.lr_sheduler + + inner_program = converted_program + # print(f"Program after convert:\n {inner_program}", flush=True) + warnings.warn( + "FLAGS_USE_STANDALONE_EXECUTOR and FLAGS_CONVERT_GRAPH_TO_PROGRAM is set to 1. Graph will be converted to Program and executed using new executor." + ) + else: + build_strategy = None + from paddle.incubate.autograd import prim_enabled, prim2orig + if prim_enabled() and program == default_main_program(): + prim2orig() + + inner_program = program + + program = _add_feed_fetch_ops(program=inner_program, + feed=feed, + fetch_list=fetch_list, + feed_var_name=feed_var_name, + fetch_var_name=fetch_var_name, + use_fetch_v2=True) + + # If there are multiple blocks in the program, subblock will not be executed with the new executor in temporary + if program.num_blocks > 1: + warnings.warn("There are more than 1 block in program.") + + # standalone executor will apply buffer_shared_inplace_pass and + # inplace_addto_op_pass to program according to build_strategy + enable_inplace = True if build_strategy is None or build_strategy.enable_inplace else False + enable_addto = True if build_strategy is not None and build_strategy.enable_addto else False + if enable_inplace or enable_addto: + # inplace should skip feed and fetch var + skip_var_names = eval(_get_program_cache_key(feed, fetch_list)) + _apply_inplace_addto_pass(program, enable_inplace, enable_addto, + skip_var_names) + + new_program = program.clone() + new_exe = _StandaloneExecutor(place, new_program, scope) + return new_program, new_exe class Executor(object): @@ -720,13 +904,19 @@ class Executor(object): # NOTE: Whether to use experimental executor `StandaloneExecutor`. self._enable_interpreter_core = _is_enable_standalone_executor() - self._executor_cache = _ExecutorCache(self.place) + self._executor_cache = _ExecutorCache() self._fleet_executor = None # TODO(liyurui): This option will be removed and always true when the functionality # of fleet executor with standalone executor is ready. self._fleet_executor_with_standalone = False + def __del__(self): + # NOTE(Ruibiao): The manually call of clear is required. Because in Python, executor_cache + # may not immediately destructed after Executor instance deleted (so does not the _StandaloneExecutor), + # that brings errors to mkl-dnn unit tests (see ClearMKLDNNCache in interpretercore.cc for why). + self._executor_cache.clear() + def _get_scope_cache(self, program_cache_key): return self.scope_caches.get(program_cache_key, None) @@ -763,67 +953,6 @@ class Executor(object): def _add_scope_cache(self, scope_cache_key, scope): self.scope_caches[scope_cache_key] = scope - def _add_feed_fetch_ops(self, - program, - feed, - fetch_list, - feed_var_name, - fetch_var_name, - use_fetch_v2=False): - tmp_program = program.clone() - - global_block = tmp_program.global_block() - - if feed_var_name in global_block.vars: - feed_var = global_block.var(feed_var_name) - else: - feed_var = global_block.create_var( - name=feed_var_name, - type=core.VarDesc.VarType.FEED_MINIBATCH, - persistable=True) - - if fetch_var_name in global_block.vars: - fetch_var = global_block.var(fetch_var_name) - else: - fetch_var = global_block.create_var( - name=fetch_var_name, - type=core.VarDesc.VarType.FETCH_LIST, - persistable=True) - - # prepend feed operators - if not has_feed_operators(global_block, feed, feed_var_name): - for i, name in enumerate(feed): - if global_block.has_var(name): - out = global_block.var(name) - global_block._prepend_op(type='feed', - inputs={'X': [feed_var]}, - outputs={'Out': [out]}, - attrs={'col': i}) - else: - warnings.warn( - "The variable %s is not found in program. It is not declared or is pruned." - % name) - - if use_fetch_v2: - fetch_op = 'fetch_v2' - else: - fetch_op = 'fetch' - - # append fetch_operators - if not has_fetch_operators(global_block, fetch_list, fetch_var_name, - fetch_op): - for i, var in enumerate(fetch_list): - assert isinstance(var, Variable) or isinstance( - var, - six.string_types), ("Wrong type for fetch_list[%s]: %s" % - (i, type(var))) - global_block.append_op(type=fetch_op, - inputs={'X': [var]}, - outputs={'Out': [fetch_var]}, - attrs={'col': i}) - - return tmp_program - def _feed_data(self, program, feed, feed_var_name, scope): # feed var to framework global_block = program.global_block() @@ -1482,23 +1611,6 @@ class Executor(object): assert isinstance(program, Program) return True - def _apply_inplace_addto_pass(program, enable_inplace, enable_addto, - skip_var_names): - use_cuda = True if core.is_compiled_with_cuda() else False - - attrs = {"use_cuda": use_cuda, "mem_opt_skip_vars": skip_var_names} - attr_types = {"use_cuda": "bool", "mem_opt_skip_vars": "list[str]"} - - empty_startup_program = Program() - if enable_inplace: - pass_name = "buffer_shared_inplace_pass" - _apply_pass(program, empty_startup_program, pass_name, attrs, - attr_types) - if enable_addto and use_cuda: - pass_name = "inplace_addto_op_pass" - _apply_pass(program, empty_startup_program, pass_name, attrs, - attr_types) - # NOTE: This is an experimental feature. If `export FLAGS_USE_STANDALONE_EXECUTOR=1 `, # use StandaloneExecutor to run the program. if return_merged and self._enable_interpreter_core and _can_use_interpreter_core( @@ -1517,68 +1629,9 @@ class Executor(object): % (type(feed))) feed = self._update_feed(program, feed) - key = _get_strong_program_cache_key_for_new_exe( - inner_program, feed, fetch_list) - - # a little bit tricy here, use inner_program before _add_feed_fetch_ops to get key - # while use program to geet _StandaloneExecutor - if key not in self._executor_cache._cached_executors: - # To apply IR pass, compile the Program to IrGraph and convert it back to Program - if isinstance(program, - compiler.CompiledProgram) or isinstance( - program._graph, compiler.CompiledProgram): - compiled_program = program if isinstance( - program, - compiler.CompiledProgram) else program._graph - build_strategy = compiled_program._build_strategy - # print(f"Program before convert:\n {inner_program}", flush=True) - compiled_program._compile(scope, self.place) - ir_graph = framework.IrGraph(compiled_program._graph) - converted_program = ir_graph.to_program() - if hasattr(inner_program, 'lr_sheduler'): - converted_program.lr_sheduler = inner_program.lr_sheduler - inner_program = converted_program - # print(f"Program after convert:\n {inner_program}", flush=True) - warnings.warn( - "FLAGS_USE_STANDALONE_EXECUTOR and FLAGS_CONVERT_GRAPH_TO_PROGRAM is set to 1. Graph will be converted to Program and executed using new executor." - ) - else: - build_strategy = None - from paddle.incubate.autograd import prim_enabled, prim2orig - if prim_enabled() and program == default_main_program(): - prim2orig() - - program = self._add_feed_fetch_ops( - program=inner_program, - feed=feed, - fetch_list=fetch_list, - feed_var_name=feed_var_name, - fetch_var_name=fetch_var_name, - use_fetch_v2=True) - - # If there are multiple blocks in the program, subblock will not be - # executed with the new executor in temporary - if program.num_blocks > 1: - warnings.warn("There are more than 1 block in program.") - - # standalone executor will apply buffer_shared_inplace_pass and - # inplace_addto_op_pass to program according to build_strategy - enable_inplace = True if build_strategy is None or build_strategy.enable_inplace else False - enable_addto = True if build_strategy is not None and build_strategy.enable_addto else False - if enable_inplace or enable_addto: - # inplace should skip feed and fetch var - skip_var_names = eval( - _get_program_cache_key(feed, fetch_list)) - _apply_inplace_addto_pass(program, enable_inplace, - enable_addto, skip_var_names) - - new_program = program.clone() - new_exe = _StandaloneExecutor(self.place, new_program, - scope) - self._executor_cache._cached_executors[key] = (new_program, - new_exe) - - program, new_exe = self._executor_cache._cached_executors[key] + program, new_exe = self._executor_cache.get_program_and_executor( + program, feed, fetch_list, feed_var_name, fetch_var_name, + self.place, scope) self._feed_data(program, feed, feed_var_name, scope) if hasattr(program, 'lr_sheduler'): @@ -1703,7 +1756,7 @@ class Executor(object): cached_ctx = self._get_ctx_cache(cache_key) cached_scope = self._get_scope_cache(cache_key) if cached_program is None: - cached_program = self._add_feed_fetch_ops( + cached_program = _add_feed_fetch_ops( program=program, feed=feed, fetch_list=fetch_list, @@ -1727,11 +1780,11 @@ class Executor(object): ctx = cached_ctx scope = cached_scope else: - program = self._add_feed_fetch_ops(program=program, - feed=feed, - fetch_list=fetch_list, - feed_var_name=feed_var_name, - fetch_var_name=fetch_var_name) + program = _add_feed_fetch_ops(program=program, + feed=feed, + fetch_list=fetch_list, + feed_var_name=feed_var_name, + fetch_var_name=fetch_var_name) self._feed_data(program, feed, feed_var_name, scope) if hasattr(program, 'lr_sheduler'): @@ -1965,7 +2018,7 @@ class Executor(object): if fetch_var_name in real_program.global_block().vars: real_fetch_list.append(fetch_var) - program._pipeline_opt["section_program"] = self._add_feed_fetch_ops( + program._pipeline_opt["section_program"] = _add_feed_fetch_ops( program=program._pipeline_opt["section_program"], feed=[], fetch_list=real_fetch_list, @@ -2095,11 +2148,11 @@ class Executor(object): if fetch_var_name in real_program.global_block().vars: real_fetch_list.append(fetch_var) - real_program = self._add_feed_fetch_ops(program=real_program, - feed=[], - fetch_list=real_fetch_list, - feed_var_name='feed', - fetch_var_name='fetch') + real_program = _add_feed_fetch_ops(program=real_program, + feed=[], + fetch_list=real_fetch_list, + feed_var_name='feed', + fetch_var_name='fetch') main_block = real_program.block(0) for op in main_block.ops: # set the op_role of fetch op to Optimize to avoid @@ -2219,12 +2272,11 @@ class Executor(object): real_program = program if "section_program" in program._pipeline_opt: real_program = program._pipeline_opt["section_program"] - cached_program = self._add_feed_fetch_ops( - program=real_program, - feed=real_feed, - fetch_list=fetch_list, - feed_var_name=feed_var_name, - fetch_var_name=fetch_var_name) + cached_program = _add_feed_fetch_ops(program=real_program, + feed=real_feed, + fetch_list=fetch_list, + feed_var_name=feed_var_name, + fetch_var_name=fetch_var_name) main_block = cached_program.block(0) for op in main_block.ops: # set the op_role of fetch op to Optimize to avoid diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 1c413d72cd..d3a62ae985 100755 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -1381,6 +1381,11 @@ py_test_modules( test_eager_deletion_padding_rnn_for_interpretercore MODULES test_eager_deletion_padding_rnn ENVS FLAGS_CONVERT_GRAPH_TO_PROGRAM=true) +set_tests_properties( + test_buffer_shared_memory_reuse_pass + test_buffer_shared_memory_reuse_pass_and_fuse_optimization_op_pass + PROPERTIES ENVIRONMENT FLAGS_CONVERT_GRAPH_TO_PROGRAM=true) + # ExecutionStrategy is deprecated in standalone executor set_tests_properties(test_parallel_executor_dry_run PROPERTIES ENVIRONMENT "FLAGS_USE_STANDALONE_EXECUTOR=0") -- GitLab