diff --git a/python/paddle/distributed/auto_parallel/hepler.py b/python/paddle/distributed/auto_parallel/hepler.py index d85489daf64caccae1a6a76c07de6bb0e56c951e..077b769116060c3973931c9e1f91d621994e3791 100644 --- a/python/paddle/distributed/auto_parallel/hepler.py +++ b/python/paddle/distributed/auto_parallel/hepler.py @@ -13,11 +13,14 @@ # limitations under the License. import logging +from collections import defaultdict from paddle.nn import Layer from paddle.jit import to_static, not_to_static from paddle.distributed.utils import get_logger from paddle.fluid.framework import Operator, Parameter, _non_static_mode +from paddle.fluid.framework import program_guard +from paddle.fluid.dygraph.dygraph_to_static.program_translator import StaticFunction from .utils import to_list @@ -39,29 +42,30 @@ class ProxyLayer(Layer): self.mode = None # generated program vars - self.input_vars = [] - self.label_vars = [] - self.output_vars = [] - self.loss_vars = [] - self.metric_vars = [] + self._input_vars = defaultdict(list) + self._label_vars = defaultdict(list) + self._output_vars = defaultdict(list) + self._loss_vars = defaultdict(list) + self._metric_vars = defaultdict(list) def _train(self, inputs, labels): """ Train process of inner_layer with forward/loss/metric logic. """ # step 1. save feed variables of Program - self.input_vars = inputs - self.label_vars = labels + mode = 'train' + self._input_vars[mode] = inputs + self._label_vars[mode] = labels # step 2. call inner_layer.forward - self.output_vars = self.inner_layer(*inputs) + self._output_vars[mode] = self.inner_layer(*inputs) # step 3. calculate loss if needed new_inputs = self._prepare(self.output_vars, labels) - self.loss_vars = self.call_loss(new_inputs) + self._loss_vars[mode] = self.call_loss(new_inputs) # step 4. calculate metrics if needed - self.metric_vars = self.call_metrics(new_inputs) + self._metric_vars[mode] = self.call_metrics(new_inputs) def _eval(self, inputs, labels): """ @@ -71,28 +75,30 @@ class ProxyLayer(Layer): # sure if they can. # step 1. save feed variables of Program - self.input_vars = inputs - self.label_vars = labels + mode = 'eval' + self._input_vars[mode] = inputs + self._label_vars[mode] = labels # step 2. call inner_layer.forward - self.output_vars = self.inner_layer(*inputs) + self._output_vars[mode] = self.inner_layer(*inputs) # step 3. calculate loss if needed new_inputs = self._prepare(self.output_vars, labels) - self.loss_vars = self.call_loss(new_inputs) + self._loss_vars[mode] = self.call_loss(new_inputs) # step 4. calculate metrics if needed - self.metric_vars = self.call_metrics(new_inputs) + self._metric_vars[mode] = self.call_metrics(new_inputs) def _predict(self, inputs): """ Predict process of inner_layer with forward logic. """ # step 1. save feed variables of Program - self.input_vars = inputs + mode = 'predict' + self._input_vars[mode] = inputs # step 2. call inner_layer.forward - self.output_vars = self.inner_layer(*inputs) + self._output_vars[mode] = self.inner_layer(*inputs) @not_to_static def _prepare(self, outputs, labels): @@ -136,15 +142,46 @@ class ProxyLayer(Layer): self.mode = mode self.training = mode == 'train' + def clone(self): + return ProxyLayer(self.inner_layer, self.loss_func, self.metrics) + + @property + def input_vars(self): + return self._input_vars[self.mode] + + @property + def label_vars(self): + return self._label_vars[self.mode] + + @property + def output_vars(self): + return self._output_vars[self.mode] + + @property + def loss_vars(self): + return self._loss_vars[self.mode] + + @property + def metric_vars(self): + return self._metric_vars[self.mode] + class BuildInfo: - def __init__(self, mode=None, state=False): - self.mode = mode - self.state = state + def __init__(self): + self.clear() + + def has_cache(self, mode, update=False): + is_cache = self.states[mode] + if update: + self.cache(mode) + return is_cache - def has_cache(self, mode): - return self.mode == mode and self.state is True + def cache(self, mode): + self.states[mode] = True + + def clear(self): + self.states = defaultdict(bool) class ProgramHelper(object): @@ -163,20 +200,27 @@ class ProgramHelper(object): self.build_info = BuildInfo() self._logger = get_logger(logging.INFO) + def reset(self): + """ + Reset all state of current Object. + """ + self.build_info.clear() + self.proxy_layer = self.proxy_layer.clone() + def build_program(self, mode): """ Convert dygraph model into static Program IR. """ assert mode in ['train', 'eval', 'predict'] + self.proxy_layer.set_mode(mode) # skip if we has already built program. - if self.build_info.has_cache(mode): + if self.build_info.has_cache(mode, True): self._logger.info( "Already build program with mode = %s, use cached program." % mode) return self._logger.info("start to build program for mode = %s." % mode) - self.proxy_layer.mode = mode input_spec = [self.inputs_spec, self.labels_spec ] if mode != 'predict' else [self.inputs_spec] static_func = to_static(self.static_func(), input_spec=input_spec) @@ -188,6 +232,8 @@ class ProgramHelper(object): # generating Program IR immediately. getattr(self.proxy_layer, func_name).concrete_program + self._build_startup_program() + def _build_startup_program(self): """ Create and Sync parameters into startup program. @@ -201,9 +247,46 @@ class ProgramHelper(object): stop_gradient=param.stop_gradient, block=self.startup_program.global_block()) + def apply_optimizer(self, optimizer): + """ + Append backward and generate optimizer operations. + """ + self._verify_optimizer(optimizer) + self._logger.info("start to apply optimizer: %s ", + type(optimizer).__name__) + # clear optimizer parameters + original_params = optimizer._parameter_list + optimizer._parameter_list = None + with program_guard(self.main_program, self.startup_program): + res = optimizer.minimize(self.loss_vars[0]) + + # restore optimizer parameters + optimizer._parameter_list = original_params + return res + + def _verify_optimizer(self, optimizer): + assert optimizer is not None + assert hasattr(optimizer, + "minimize"), "Optimizer must have minimize() method." + assert self.proxy_layer.mode == 'train', "Required mode == 'train', but received '%s'" % self.proxy_layer.mode + assert len( + self.loss_vars + ) == 1, "Required len(loss_vars) == 1, but received len(loss_vars) = %s" % len( + self.loss_vars) + + def to(self, mode): + """ + Switch underly proxy layer mode into target mode. + """ + assert mode in ['train', 'eval', 'predict'] + func = getattr(self.proxy_layer, '_' + mode) + assert isinstance( + func, StaticFunction), "Please call build_program(mode) firstly." + self.proxy_layer.set_mode(mode) + def static_func(self): """ - Return target mode function. + Return StaticFunction instance with underly target mode. """ assert self.proxy_layer.mode in [ 'train', 'eval', 'predict' diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_to_static.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_to_static.py index 4e4fb9b5825ed8c86a52ba2eae666d47e9f4447a..a3ab87160da6825c3d85bf8023a98e284c907a32 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_to_static.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_to_static.py @@ -27,6 +27,7 @@ from paddle.io import Dataset from paddle.static import InputSpec from paddle.fluid.framework import _non_static_mode from paddle.distributed.auto_parallel.engine import Engine +from paddle.distributed.auto_parallel.hepler import ProgramHelper batch_size = 4 batch_num = 30 @@ -85,6 +86,45 @@ class MLPLayer(nn.Layer): return out +class TestWholeProgram(unittest.TestCase): + + def test_apply_optimzier(self): + paddle.disable_static() + mlp = MLPLayer(hidden_size=hidden_size, + intermediate_size=4 * hidden_size, + dropout_ratio=0.1, + initializer_range=0.02) + metrics = paddle.metric.Accuracy() + loss = paddle.nn.CrossEntropyLoss() + optimizer = paddle.optimizer.SGD(learning_rate=0.00001, + parameters=mlp.parameters()) + inputs = InputSpec([batch_size, hidden_size], 'float32', 'x') + labels = InputSpec([batch_size], 'int64', 'label') + + program_helper = ProgramHelper(mlp, loss, [metrics], [inputs], [labels]) + paddle.enable_static() + # step 1: build program + program_helper.build_program(mode='train') + program_helper.build_program(mode='eval') + # support easily to switch mode + program_helper.to('train') + + forward_ops = program_helper.main_program.block(0).ops + self.assertEqual(len(forward_ops), 21) + + # step 2: apply optimzer to generate whole program + optimize_ops, _ = program_helper.apply_optimizer(optimizer) + all_ops = program_helper.main_program.block(0).ops + sgd_ops = [ + op for op in program_helper.main_program.block(0).ops + if op.type == 'sgd' + ] + self.assertEqual(len(all_ops), 41) + self.assertEqual(len(optimize_ops), len(sgd_ops)) + + program_helper.reset() + + class TestToStatic(unittest.TestCase): def test_to_static(self):