From a5036775a954696a25abb8bfc53a7694c26456e7 Mon Sep 17 00:00:00 2001 From: Aurelius84 Date: Wed, 4 Mar 2020 15:03:59 +0800 Subject: [PATCH] Add program_cache in dygrapht_to_static (#22766) --- .../dygraph/dygraph_to_static/__init__.py | 4 + .../dygraph_to_static/ast_transformer.py | 25 +- .../dygraph_to_static/cache_program.py | 345 ++++++++++++++++++ python/paddle/fluid/dygraph/jit.py | 79 +--- .../dygraph_to_static/test_cache_program.py | 115 ++++++ .../dygraph_to_static/test_fetch_feed.py | 11 +- 6 files changed, 512 insertions(+), 67 deletions(-) create mode 100644 python/paddle/fluid/dygraph/dygraph_to_static/cache_program.py create mode 100644 python/paddle/fluid/tests/unittests/dygraph_to_static/test_cache_program.py diff --git a/python/paddle/fluid/dygraph/dygraph_to_static/__init__.py b/python/paddle/fluid/dygraph/dygraph_to_static/__init__.py index fab33cbd137..9df7cb4e3c4 100644 --- a/python/paddle/fluid/dygraph/dygraph_to_static/__init__.py +++ b/python/paddle/fluid/dygraph/dygraph_to_static/__init__.py @@ -20,6 +20,10 @@ from .ast_transformer import * from . import static_analysis from .static_analysis import * +from . import cache_program +from .cache_program import * + __all__ = [] __all__ += ast_transformer.__all__ __all__ += static_analysis.__all__ +__all__ += cache_program.__all__ diff --git a/python/paddle/fluid/dygraph/dygraph_to_static/ast_transformer.py b/python/paddle/fluid/dygraph/dygraph_to_static/ast_transformer.py index 73eb8ec3a98..e2e7ed45a5c 100644 --- a/python/paddle/fluid/dygraph/dygraph_to_static/ast_transformer.py +++ b/python/paddle/fluid/dygraph/dygraph_to_static/ast_transformer.py @@ -15,15 +15,17 @@ from __future__ import print_function from .utils import * import gast +import textwrap +import inspect # gast is a generic AST to represent Python2 and Python3's Abstract Syntax Tree(AST). # It provides a compatibility layer between the AST of various Python versions, # as produced by ast.parse from the standard ast module. # See details in https://github.com/serge-sans-paille/gast/ -from .ast_utils import is_control_flow_if, create_cond_node, transform_if_else +from .ast_utils import is_control_flow_if, create_cond_node, transform_if_else, ast_to_func from paddle.fluid import unique_name from .static_analysis import AstNodeWrapper, StaticAnalysisVisitor -__all__ = ['DygraphToStaticAst'] +__all__ = ['DygraphToStaticAst', 'convert_to_static'] DECORATOR_NAMES = ['dygraph_to_static_output', 'dygraph_to_static_graph'] @@ -253,3 +255,22 @@ class BasicApiTransformer(gast.NodeTransformer): def get_feed_name_to_arg_id(self): return self.feed_name_to_arg_id + + +def convert_to_static(dyfunc): + """ + Converts dygraph function into static function. + """ + # Get AST from dygraph function + raw_code = inspect.getsource(dyfunc) + code = textwrap.dedent(raw_code) + root = gast.parse(code) + + # Transform AST + dygraph_to_static = DygraphToStaticAst() + root_wrapper = dygraph_to_static.get_static_ast(root) + + # Get static_func from AST + func_name = dygraph_to_static.get_module_name() + static_func, file_name = ast_to_func(root_wrapper.node, func_name) + return static_func, dygraph_to_static diff --git a/python/paddle/fluid/dygraph/dygraph_to_static/cache_program.py b/python/paddle/fluid/dygraph/dygraph_to_static/cache_program.py new file mode 100644 index 00000000000..60d66b99c85 --- /dev/null +++ b/python/paddle/fluid/dygraph/dygraph_to_static/cache_program.py @@ -0,0 +1,345 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import inspect +import textwrap +import threading +import numpy +import six + +from paddle.fluid import framework +from paddle.fluid.layers import io +from paddle.fluid import core, executor +from paddle.fluid.dygraph.dygraph_to_static import convert_to_static + +__all__ = ['AutoTracer'] + + +class FunctionCache(object): + """ + Caches the transformed functions to avoid redundant conversions of the same function. + """ + + def __init__(self): + self._cache_funcs = dict() + self._func_to_transformer = dict() + + def __call__(self, func): + static_func = self._get_or_cache_func(func) + return static_func + + def _get_or_cache_func(self, func): + + cache_key = self.hash_key(func) + static_func = self._cache_funcs.get(cache_key, None) + + if static_func is None: + static_func, dygraph_to_static = convert_to_static(func) + self._cache_funcs[cache_key] = static_func + self._func_to_transformer[static_func] = dygraph_to_static + + return static_func + + def transformer(self, func): + return self._func_to_transformer.get(func, None) + + def hash_key(self, func): + raw_code = inspect.getsource(func) + code = textwrap.dedent(raw_code) + + return hash(code) + + def exist(self, func): + return self._cache_funcs.get(self.hash_key(func), None) is not None + + +def synchronized(func): + func.__lock__ = threading.Lock() + + def lock_func(*args, **kwargs): + with func.__lock__: + return func(*args, **kwargs) + + return lock_func + + +class ProgramCache(object): + """ + Wrapper class for the program functions defined by dygraph function. + """ + + def __init__(self): + self._inputs = [] + self._outputs = [] + # Always set program to default_main_program. Because once `__call__` is called, + # it means layers(or Ops) are added into default_main_program switched by outer + # `with` statement. + self._program = framework.default_main_program() + self._func_cache = FunctionCache() + # Stores the entry function of Net or Model. + self._forward_func = None + self._feed_name_to_idx = {} + self._is_repeated = False + # Indicates whether the function call is still building program. + # Because `__call__` can be called recursively when `Net` has + # sub class in `forward()`. + self._in_build_process = True + + def __call__(self, dyfunc, *args, **kwargs): + """ + Executes the main_program with specialized inputs. + """ + # Transfroms dygraph function into static functions and caches them. + static_func = self._transform_or_cache_layers(dyfunc) + + # 1. Adds `fluid.data` layers for input if needed + if not self._inputs: + self._add_feed_layers(args, kwargs) + + # 2. Avoids inserting forward ops repeatedly. + if self._is_repeated: + return self.outputs + + # 3. Builds program only once and returns the output Variables. + outputs = self._get_or_build_program(static_func, args, kwargs) + + if static_func == self._forward_func: + self._in_build_process = False + + return outputs + + def _transform_or_cache_layers(self, dyfunc): + """ + Transforms dygraph function into static function. + """ + static_func = self._func_cache(dyfunc) + # self._forward_func is entry function of Net or Model. + # It can be called for multiple times, but layers from these functions + # call stack will be added into self._program only once. + # After that, cached program will be always returned by default. + if static_func == self._forward_func: + self._is_repeated = True + + if self._forward_func is None: + self._forward_func = static_func + + return static_func + + def _get_or_build_program(self, func, args, kwargs): + """ + Returns program of the input function. If called at first time, + builds a new program and caches it. + """ + with framework.program_guard(self._program): + if func == self._forward_func: + # Replaces input data with `layers.data` + args = list(args) + for feed_layer in self._inputs: + idx = self.feed_name_to_idx[feed_layer.name] + args[idx] = feed_layer + fetch_list = func(*args, **kwargs) + self._outputs = fetch_list + else: + fetch_list = func(*args, **kwargs) + + return fetch_list + + def _add_feed_layers(self, args, kwargs): + """ + Adds `fluid.data` if the input `numpy.ndarray` is converted into `Variable` + by `to_variable()`, it makes program to be executed dynamically. + """ + if not self._feed_name_to_idx: + self._feed_name_to_idx = self._get_name_to_idx(self._forward_func) + with framework.program_guard(self._program): + for feed_name, idx in self.feed_name_to_idx.items(): + batch_data = args[idx] + assert isinstance( + batch_data, numpy.ndarray + ), "Input {} should be numpy.ndarray, but received {}.".format( + feed_name, type(batch_data)) + feed_layer = io.data( + name=feed_name, + shape=list(batch_data.shape[1:]), + dtype=str(batch_data.dtype)) + self._inputs.append(feed_layer) + + def _get_name_to_idx(self, func): + """ + Returns name and index of input args from `forward(args)` + that need to be replaced with `fluid.data`. + """ + transformer = self._func_cache.transformer(func) + feed_name_to_idx = transformer.get_feed_name_to_idx() + return feed_name_to_idx + + @property + def program(self): + return self._program + + @property + def inputs(self): + return self._inputs + + @property + def outputs(self): + return self._outputs + + @property + def feed_name_to_idx(self): + return self._feed_name_to_idx + + @property + def in_build_process(self): + return self._in_build_process + + +class AutoTracer(object): + + _instance = None + + @synchronized + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = object.__new__(cls, *args, **kwargs) + cls._instance.__initialized = False + return cls._instance + + @classmethod + def get_instance(cls): + if cls._instance is None: + raise ValueError("FuncProgram hasn\'t been created!") + return cls._instance + + @classmethod + def reset(cls): + if cls._instance is not None: + cls._instance.__initialized = False + cls._instance.__init__() + + def __init__(self, exe=None, place=None): + # To make sure that calls __init__ only once. + if self.__initialized: + return + self.__initialized = True + self._place = core.CPUPlace() if place is None else place + if exe is None: + self._exe = executor.Executor(self._place) + else: + self._exe = exe + self._cached_program = ProgramCache() + self._optimizer = None + self._already_minimized = False + # Once main_program is changed, should run startup_program. + self._need_startup = True + + def run(self, *args, **kwargs): + """ + Executes main_program and returns output Tensors. + """ + feed_dict, fetch_list = self._prepare(args) + + main_program = self._cached_program.program + outputs = self._exe.run(main_program, + feed=feed_dict, + fetch_list=fetch_list) + + return outputs + + def _prepare(self, args): + """ + Prepares with feed_dict, fetch_list, optimizer and initialize vars + by running startup_program. + """ + + # Updates batch_data for feed_dict + feed_dict = self._update_batch_data(args) + fetch_list = self._cached_program.outputs + + # Adds optimizer if needed. + if self._optimizer and not self._already_minimized: + self._add_optimizer() + + if self._need_startup: + self._exe.run(framework.default_startup_program()) + self._need_startup = False + + return feed_dict, fetch_list + + def _check_cache_valid(self): + """ + Checks whether the current program is consistent with `default_main_program`. + In some models and unittest, program will be switched frequently by `program_guard`. + If does, the cached program and other properties are not available and should be reset. + """ + if self._cached_program.program: + if self._cached_program.program != framework.default_main_program(): + AutoTracer.reset() + + def _update_batch_data(self, args): + """ + Updates cached batch data while training program. + """ + feed_name_to_idx = self._cached_program.feed_name_to_idx + feed_vars = self._cached_program.inputs + feed_dict = {} + for feed_var in feed_vars: + idx = feed_name_to_idx[feed_var.name] + feed_dict[feed_var.name] = args[idx] + + return feed_dict + + def set_optimizer(self, optimizer, loss_name): + """ + Supports to set or update the optimizer used to minimize loss. + """ + self._check_cache_valid() + self._optimizer = optimizer + + if not isinstance(loss_name, six.string_types): + raise ValueError( + "Type of input loss_name should type(str), but received {}." + .format(type(loss_name))) + self._loss_name = loss_name + + def _add_optimizer(self): + """ + Supports to set or update the optimizer used to minimize loss. + """ + main_program = self._cached_program.program + all_vars = main_program.block(0).vars + loss_var = all_vars.get(self._loss_name, None) + + if loss_var is None: + raise ValueError( + "Can't find {} in main_program, please confirm whether the loss input is correct" + .format(self._loss_name)) + # Adds optimizer to minimize loss + with framework.program_guard(main_program): + self._optimizer.minimize(loss_var) + + # Avoids to set optimizer repeatedly. + self._already_minimized = True + + def get_cached_program(self): + """ + Returns the ProgramCache instance. + """ + self._check_cache_valid() + return self._cached_program + + @property + def program(self): + return self._cached_program.program diff --git a/python/paddle/fluid/dygraph/jit.py b/python/paddle/fluid/dygraph/jit.py index 423c6702881..78a6385e159 100644 --- a/python/paddle/fluid/dygraph/jit.py +++ b/python/paddle/fluid/dygraph/jit.py @@ -16,21 +16,16 @@ from __future__ import print_function __all__ = ['TracedLayer', 'dygraph_to_static_output', 'dygraph_to_static_graph'] -import gast -import inspect -import textwrap import warnings from ..wrapped_decorator import wrap_decorator from .base import program_desc_tracing_guard, switch_to_static_graph -from .dygraph_to_static import DygraphToStaticAst -from .dygraph_to_static.ast_utils import ast_to_func +from .dygraph_to_static import AutoTracer, convert_to_static from .layers import Layer from paddle.fluid import core from paddle.fluid.framework import Program, Block, Variable, _dygraph_tracer, dygraph_only, _dygraph_guard, _current_expected_place, in_dygraph_mode from paddle.fluid.executor import Executor, scope_guard from paddle.fluid.compiler import CompiledProgram -from paddle.fluid import program_guard, data, default_startup_program, default_main_program def create_program_from_desc(program_desc): @@ -56,23 +51,6 @@ def extract_vars(inputs): return result_list -def to_static_func(dygraph_func): - # Get AST from dygraph function - dygraph_code = inspect.getsource(dygraph_func) - dygraph_code = textwrap.dedent(dygraph_code) - root = gast.parse(dygraph_code) - - # Transform AST - dygraph_to_static = DygraphToStaticAst() - root_wrapper = dygraph_to_static.get_static_ast(root) - - # Get static_func from AST - func_name = dygraph_to_static.get_module_name() - static_func, file_name = ast_to_func(root_wrapper.node, func_name) - - return static_func, dygraph_to_static - - def _dygraph_to_static_graph_(dygraph_func): def __impl__(*args, **kwargs): if in_dygraph_mode(): @@ -80,13 +58,20 @@ def _dygraph_to_static_graph_(dygraph_func): "The decorator 'dygraph_to_static_graph' doesn't work in dygraph mode." " Please use it in static mode.") return dygraph_func(*args, **kwargs) - static_func, dygraph_to_static = to_static_func(dygraph_func) + static_func, ast_transformer = convert_to_static(dygraph_func) return static_func(*args, **kwargs) return __impl__ +dygraph_to_static_graph = wrap_decorator(_dygraph_to_static_graph_) + + def _dygraph_to_static_output_(dygraph_func): + # Singleton object to cache main_program to avoid inserting ops repeatedly. + # TODO: Need a better class name + auto_tracer = AutoTracer() + def __impl__(*args, **kwargs): if in_dygraph_mode(): warnings.warn( @@ -94,45 +79,19 @@ def _dygraph_to_static_output_(dygraph_func): " Please use it in static mode.") return dygraph_func(*args, **kwargs) - static_func, dygraph_to_static = to_static_func(dygraph_func) - feed_name_to_idx = dygraph_to_static.get_feed_name_to_idx() - feed_dict = {} - for feed_name, idx in feed_name_to_idx.items(): - feed_dict[feed_name] = args[idx] - - # Run static_func in static mode - startup_program = default_main_program() - main_program = default_startup_program() - static_res = run_static_func(main_program, startup_program, static_func, - args, kwargs, feed_dict, feed_name_to_idx) - return static_res - - return __impl__ + cached_program = auto_tracer.get_cached_program() + outputs = cached_program(dygraph_func, *args, **kwargs) + # Run program to fetch output Tensors once building successfully. + if not cached_program.in_build_process: + outputs = auto_tracer.run(*args, **kwargs) -def run_static_func(main_program, startup_program, static_func, args, kwargs, - feed_dict, feed_name_to_idx): + return outputs - with program_guard(main_program, startup_program): - args_list = list(args) - for var_name, value in feed_dict.items(): - idx = feed_name_to_idx[var_name] - args_list[idx] = data( - name=var_name, shape=value.shape, dtype=str(value.dtype)) - args = tuple(args_list) - static_out = static_func(*args, **kwargs) - if not isinstance(static_out, (list, tuple)): - static_out = [static_out] - exe = Executor(core.CPUPlace()) - exe.run(startup_program) - static_res = exe.run(main_program, - fetch_list=static_out, - feed=feed_dict) - return static_res + return __impl__ dygraph_to_static_output = wrap_decorator(_dygraph_to_static_output_) -dygraph_to_static_graph = wrap_decorator(_dygraph_to_static_graph_) @dygraph_only @@ -394,11 +353,11 @@ class TracedLayer(object): in_var = to_variable(in_np) out_dygraph, static_layer = TracedLayer.trace(layer, inputs=[in_var]) static_layer.save_inference_model(save_dirname, feed=[0], fetch=[0]) - - place = fluid.CPUPlace() + + place = fluid.CPUPlace() exe = fluid.Executor(place) program, feed_vars, fetch_vars = fluid.io.load_inference_model(save_dirname, - exe) + exe) fetch, = exe.run(program, feed={feed_vars[0]: in_np}, fetch_list=fetch_vars) print(fetch.shape) # (2, 10) diff --git a/python/paddle/fluid/tests/unittests/dygraph_to_static/test_cache_program.py b/python/paddle/fluid/tests/unittests/dygraph_to_static/test_cache_program.py new file mode 100644 index 00000000000..dd51b00a912 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/dygraph_to_static/test_cache_program.py @@ -0,0 +1,115 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import unittest +import numpy as np +from collections import Counter + +import paddle.fluid as fluid + +from paddle.fluid.dygraph.dygraph_to_static import AutoTracer +from paddle.fluid.dygraph.jit import dygraph_to_static_output + +from test_fetch_feed import Pool2D, Linear + + +class TestCacheProgram(unittest.TestCase): + def setUp(self): + self.batch_num = 5 + self.dygraph_class = Pool2D + self.data = np.random.random((1, 2, 4, 4)).astype('float32') + + def test_cache(self): + prev_ops, cur_ops = Counter(), Counter() + prev_out, cur_out = None, None + main_program = fluid.Program() + with fluid.program_guard(main_program): + static_net = self.dygraph_class() + for batch_id in range(self.batch_num): + out = static_net(self.data) + # Check outputs + prev_out = cur_out + cur_out = out + # Check forward ops + prev_ops = cur_ops + cur_ops = Counter([ + op.type for op in fluid.default_main_program().block(0).ops + ]) + if batch_id > 0: + self.assertTrue( + np.allclose(prev_out[0], cur_out[0]), + msg='Output in previous batch is {}\n Output in current batch is \n{}' + .format(prev_out, cur_out)) + self.assertEqual(prev_ops, cur_ops) + + +class TestCacheProgram2(TestCacheProgram): + def setUp(self): + self.batch_num = 5 + self.dygraph_class = Linear + self.data = np.random.random((4, 10)).astype('float32') + + +class TestCacheProgramWithOptimizer(unittest.TestCase): + def setUp(self): + self.dygraph_class = Linear + self.data = np.random.random((4, 10)).astype('float32') + self.batch_num = 5 + + def train_static(self): + main_program = fluid.Program() + loss_data = [] + with fluid.program_guard(main_program): + static_net = self.dygraph_class() + adam = fluid.optimizer.AdamOptimizer(learning_rate=0.001) + # set optimizer + # TODO: Need a better interfaces to set optimizer. + auto_tracer = AutoTracer() + auto_tracer.set_optimizer(adam, 'avg_loss') + + for batch_id in range(self.batch_num): + pred, avg_loss = static_net(self.data) + loss_data.append(np.array(avg_loss)) + + return loss_data + + def train_dygraph(self): + with fluid.dygraph.guard(fluid.CPUPlace()): + dygraph_net = self.dygraph_class() + adam = fluid.optimizer.AdamOptimizer( + learning_rate=0.001, parameter_list=dygraph_net.parameters()) + loss_data = [] + for batch_id in range(self.batch_num): + pred, avg_loss = dygraph_net(self.data) + + loss_data.append(avg_loss.numpy()) + avg_loss.backward() + adam.minimize(avg_loss) + dygraph_net.clear_gradients() + + return loss_data + + def test_with_optimizer(self): + dygraph_loss = self.train_dygraph() + static_loss = self.train_static() + self.assertTrue( + np.allclose(dygraph_loss, static_loss), + msg='dygraph is {}\n static_res is \n{}'.format(dygraph_loss, + static_loss)) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/dygraph_to_static/test_fetch_feed.py b/python/paddle/fluid/tests/unittests/dygraph_to_static/test_fetch_feed.py index 0c5ffba32a5..5dc806cd59a 100644 --- a/python/paddle/fluid/tests/unittests/dygraph_to_static/test_fetch_feed.py +++ b/python/paddle/fluid/tests/unittests/dygraph_to_static/test_fetch_feed.py @@ -58,7 +58,8 @@ class Linear(fluid.dygraph.Layer): def forward(self, x): inputs = fluid.dygraph.to_variable(x) pre = self.fc(inputs) - return pre + loss = fluid.layers.mean(pre, name='avg_loss') + return pre, loss class TestPool2D(unittest.TestCase): @@ -69,10 +70,11 @@ class TestPool2D(unittest.TestCase): def run_dygraph_mode(self): with fluid.dygraph.guard(): dy_layer = self.dygraph_class() - for _ in range(1): + prediction = dy_layer(x=self.data) + if isinstance(prediction, (list, tuple)): + prediction = prediction[0] - prediction = dy_layer(x=self.data) - return prediction.numpy() + return prediction.numpy() def run_static_mode(self): startup_prog = fluid.Program() @@ -90,7 +92,6 @@ class TestPool2D(unittest.TestCase): np.allclose(dygraph_res, static_res), msg='dygraph_res is {}\n static_res is \n{}'.format(dygraph_res, static_res)) - return class TestLinear(TestPool2D): -- GitLab