# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import division from __future__ import print_function import unittest import random import numpy as np import paddle import paddle.fluid as fluid from paddle.fluid.dygraph import Embedding, Linear, Layer from paddle.fluid.layers import BeamSearchDecoder from paddle.incubate.hapi import Model, Input, set_device from paddle.incubate.hapi.text import * class ModuleApiTest(unittest.TestCase): @classmethod def setUpClass(cls): cls._np_rand_state = np.random.get_state() cls._py_rand_state = random.getstate() cls._random_seed = 123 np.random.seed(cls._random_seed) random.seed(cls._random_seed) cls.model_cls = type(cls.__name__ + "Model", (Layer, ), { "__init__": cls.model_init_wrapper(cls.model_init), "forward": cls.model_forward }) @classmethod def tearDownClass(cls): np.random.set_state(cls._np_rand_state) random.setstate(cls._py_rand_state) @staticmethod def model_init_wrapper(func): def __impl__(self, *args, **kwargs): Layer.__init__(self) func(self, *args, **kwargs) return __impl__ @staticmethod def model_init(model, *args, **kwargs): raise NotImplementedError( "model_init acts as `Model.__init__`, thus must implement it") @staticmethod def model_forward(model, *args, **kwargs): return model.module(*args, **kwargs) def make_inputs(self): # TODO(guosheng): add default from `self.inputs` raise NotImplementedError( "model_inputs makes inputs for model, thus must implement it") def setUp(self): """ For the model which wraps the module to be tested: Set input data by `self.inputs` list Set init argument values by `self.attrs` list/dict Set model parameter values by `self.param_states` dict Set expected output data by `self.outputs` list We can create a model instance and run once with these. """ self.inputs = [] self.attrs = {} self.param_states = {} self.outputs = [] def _calc_output(self, place, mode="test", dygraph=True): if dygraph: fluid.enable_dygraph(place) else: fluid.disable_dygraph() gen = paddle.manual_seed(self._random_seed) gen._is_init_py = False paddle.framework.random._manual_program_seed(self._random_seed) scope = fluid.core.Scope() with fluid.scope_guard(scope): layer = self.model_cls(**self.attrs) if isinstance( self.attrs, dict) else self.model_cls(*self.attrs) model = Model(layer, inputs=self.make_inputs()) model.prepare() if self.param_states: model.load(self.param_states, optim_state=None) return model.test_batch(self.inputs) def check_output_with_place(self, place, mode="test"): dygraph_output = self._calc_output(place, mode, dygraph=True) stgraph_output = self._calc_output(place, mode, dygraph=False) expect_output = getattr(self, "outputs", None) for actual_t, expect_t in zip(dygraph_output, stgraph_output): self.assertTrue(np.allclose(actual_t, expect_t, rtol=1e-5, atol=0)) if expect_output: for actual_t, expect_t in zip(dygraph_output, expect_output): self.assertTrue( np.allclose( actual_t, expect_t, rtol=1e-5, atol=0)) def check_output(self): devices = ["CPU", "GPU"] if fluid.is_compiled_with_cuda() else ["CPU"] for device in devices: place = set_device(device) self.check_output_with_place(place) class TestBasicLSTM(ModuleApiTest): def setUp(self): # TODO(guosheng): Change to big size. Currently bigger hidden size for # LSTM would fail, the second static graph run might get diff output # with others. shape = (2, 4, 16) self.inputs = [np.random.random(shape).astype("float32")] self.outputs = None self.attrs = {"input_size": 16, "hidden_size": 16} self.param_states = {} @staticmethod def model_init(model, input_size, hidden_size): model.lstm = RNN(BasicLSTMCell( input_size, hidden_size, )) @staticmethod def model_forward(model, inputs): return model.lstm(inputs)[0] def make_inputs(self): inputs = [ Input([None, None, self.inputs[-1].shape[-1]], "float32", "input"), ] return inputs def test_check_output(self): self.check_output() class TestBasicGRU(ModuleApiTest): def setUp(self): shape = (2, 4, 128) self.inputs = [np.random.random(shape).astype("float32")] self.outputs = None self.attrs = {"input_size": 128, "hidden_size": 128} self.param_states = {} @staticmethod def model_init(model, input_size, hidden_size): model.gru = RNN(BasicGRUCell(input_size, hidden_size)) @staticmethod def model_forward(model, inputs): return model.gru(inputs)[0] def make_inputs(self): inputs = [ Input([None, None, self.inputs[-1].shape[-1]], "float32", "input"), ] return inputs def test_check_output(self): self.check_output() class TestBeamSearch(ModuleApiTest): def setUp(self): shape = (8, 32) self.inputs = [ np.random.random(shape).astype("float32"), np.random.random(shape).astype("float32") ] self.outputs = None self.attrs = { "vocab_size": 100, "embed_dim": 32, "hidden_size": 32, } self.param_states = {} @staticmethod def model_init(self, vocab_size, embed_dim, hidden_size, bos_id=0, eos_id=1, beam_size=4, max_step_num=20): embedder = Embedding(size=[vocab_size, embed_dim]) output_layer = Linear(hidden_size, vocab_size) cell = BasicLSTMCell(embed_dim, hidden_size) decoder = BeamSearchDecoder( cell, start_token=bos_id, end_token=eos_id, beam_size=beam_size, embedding_fn=embedder, output_fn=output_layer) self.beam_search_decoder = DynamicDecode( decoder, max_step_num=max_step_num, is_test=True) @staticmethod def model_forward(model, init_hidden, init_cell): return model.beam_search_decoder([init_hidden, init_cell])[0] def make_inputs(self): inputs = [ Input([None, self.inputs[0].shape[-1]], "float32", "init_hidden"), Input([None, self.inputs[1].shape[-1]], "float32", "init_cell"), ] return inputs def test_check_output(self): self.check_output() class TestTransformerEncoder(ModuleApiTest): def setUp(self): self.inputs = [ # encoder input: [batch_size, seq_len, hidden_size] np.random.random([2, 4, 512]).astype("float32"), # self attention bias: [batch_size, n_head, seq_len, seq_len] np.random.randint(0, 1, [2, 8, 4, 4]).astype("float32") * -1e9 ] self.outputs = None self.attrs = { "n_layer": 2, "n_head": 8, "d_key": 64, "d_value": 64, "d_model": 512, "d_inner_hid": 1024 } self.param_states = {} @staticmethod def model_init(model, n_layer, n_head, d_key, d_value, d_model, d_inner_hid, prepostprocess_dropout=0.1, attention_dropout=0.1, relu_dropout=0.1, preprocess_cmd="n", postprocess_cmd="da", ffn_fc1_act="relu"): model.encoder = TransformerEncoder( n_layer, n_head, d_key, d_value, d_model, d_inner_hid, prepostprocess_dropout, attention_dropout, relu_dropout, preprocess_cmd, postprocess_cmd, ffn_fc1_act) @staticmethod def model_forward(model, enc_input, attn_bias): return model.encoder(enc_input, attn_bias) def make_inputs(self): inputs = [ Input([None, None, self.inputs[0].shape[-1]], "float32", "enc_input"), Input([None, self.inputs[1].shape[1], None, None], "float32", "attn_bias"), ] return inputs def test_check_output(self): self.check_output() class TestTransformerDecoder(TestTransformerEncoder): def setUp(self): self.inputs = [ # decoder input: [batch_size, seq_len, hidden_size] np.random.random([2, 4, 512]).astype("float32"), # encoder output: [batch_size, seq_len, hidden_size] np.random.random([2, 5, 512]).astype("float32"), # self attention bias: [batch_size, n_head, seq_len, seq_len] np.random.randint(0, 1, [2, 8, 4, 4]).astype("float32") * -1e9, # cross attention bias: [batch_size, n_head, seq_len, seq_len] np.random.randint(0, 1, [2, 8, 4, 5]).astype("float32") * -1e9 ] self.outputs = None self.attrs = { "n_layer": 2, "n_head": 8, "d_key": 64, "d_value": 64, "d_model": 512, "d_inner_hid": 1024 } self.param_states = {} @staticmethod def model_init(model, n_layer, n_head, d_key, d_value, d_model, d_inner_hid, prepostprocess_dropout=0.1, attention_dropout=0.1, relu_dropout=0.1, preprocess_cmd="n", postprocess_cmd="da"): model.decoder = TransformerDecoder( n_layer, n_head, d_key, d_value, d_model, d_inner_hid, prepostprocess_dropout, attention_dropout, relu_dropout, preprocess_cmd, postprocess_cmd) @staticmethod def model_forward(model, dec_input, enc_output, self_attn_bias, cross_attn_bias, caches=None): return model.decoder(dec_input, enc_output, self_attn_bias, cross_attn_bias, caches) def make_inputs(self): inputs = [ Input([None, None, self.inputs[0].shape[-1]], "float32", "dec_input"), Input([None, None, self.inputs[0].shape[-1]], "float32", "enc_output"), Input([None, self.inputs[-1].shape[1], None, None], "float32", "self_attn_bias"), Input([None, self.inputs[-1].shape[1], None, None], "float32", "cross_attn_bias"), ] return inputs def test_check_output(self): self.check_output() class TestTransformerBeamSearchDecoder(ModuleApiTest): def setUp(self): self.inputs = [ # encoder output: [batch_size, seq_len, hidden_size] np.random.random([2, 5, 128]).astype("float32"), # cross attention bias: [batch_size, n_head, seq_len, seq_len] np.random.randint(0, 1, [2, 2, 1, 5]).astype("float32") * -1e9 ] self.outputs = None self.attrs = { "vocab_size": 100, "n_layer": 2, "n_head": 2, "d_key": 64, "d_value": 64, "d_model": 128, "d_inner_hid": 128 } self.param_states = {} @staticmethod def model_init(model, vocab_size, n_layer, n_head, d_key, d_value, d_model, d_inner_hid, prepostprocess_dropout=0.1, attention_dropout=0.1, relu_dropout=0.1, preprocess_cmd="n", postprocess_cmd="da", bos_id=0, eos_id=1, beam_size=4, max_step_num=20): model.beam_size = beam_size def embeder_init(self, size): Layer.__init__(self) self.embedder = Embedding(size) Embedder = type("Embedder", (Layer, ), { "__init__": embeder_init, "forward": lambda self, word, pos: self.embedder(word) }) embedder = Embedder(size=[vocab_size, d_model]) output_layer = Linear(d_model, vocab_size) model.decoder = TransformerDecoder( n_layer, n_head, d_key, d_value, d_model, d_inner_hid, prepostprocess_dropout, attention_dropout, relu_dropout, preprocess_cmd, postprocess_cmd) transformer_cell = TransformerCell(model.decoder, embedder, output_layer) model.beam_search_decoder = DynamicDecode( TransformerBeamSearchDecoder( transformer_cell, bos_id, eos_id, beam_size, var_dim_in_state=2), max_step_num, is_test=True) @staticmethod def model_forward(model, enc_output, trg_src_attn_bias): caches = model.decoder.prepare_incremental_cache(enc_output) enc_output = TransformerBeamSearchDecoder.tile_beam_merge_with_batch( enc_output, model.beam_size) trg_src_attn_bias = TransformerBeamSearchDecoder.tile_beam_merge_with_batch( trg_src_attn_bias, model.beam_size) static_caches = model.decoder.prepare_static_cache(enc_output) rs, _ = model.beam_search_decoder( inits=caches, enc_output=enc_output, trg_src_attn_bias=trg_src_attn_bias, static_caches=static_caches) return rs def make_inputs(self): inputs = [ Input([None, None, self.inputs[0].shape[-1]], "float32", "enc_output"), Input([None, self.inputs[1].shape[1], None, None], "float32", "trg_src_attn_bias"), ] return inputs def test_check_output(self): self.check_output() class TestSequenceTagging(ModuleApiTest): def setUp(self): self.inputs = [ np.random.randint(0, 100, (2, 8)).astype("int64"), np.random.randint(1, 8, (2)).astype("int64"), np.random.randint(0, 5, (2, 8)).astype("int64") ] self.outputs = None self.attrs = {"vocab_size": 100, "num_labels": 5} self.param_states = {} @staticmethod def model_init(model, vocab_size, num_labels, word_emb_dim=128, grnn_hidden_dim=128, emb_learning_rate=0.1, crf_learning_rate=0.1, bigru_num=2, init_bound=0.1): model.tagger = SequenceTagging(vocab_size, num_labels, word_emb_dim, grnn_hidden_dim, emb_learning_rate, crf_learning_rate, bigru_num, init_bound) @staticmethod def model_forward(model, word, lengths, target=None): return model.tagger(word, lengths, target) def make_inputs(self): inputs = [ Input([None, None], "int64", "word"), Input([None], "int64", "lengths"), Input([None, None], "int64", "target"), ] return inputs def test_check_output(self): self.check_output() class TestSequenceTaggingInfer(TestSequenceTagging): def setUp(self): super(TestSequenceTaggingInfer, self).setUp() self.inputs = self.inputs[:2] # remove target def make_inputs(self): inputs = super(TestSequenceTaggingInfer, self).make_inputs()[:2] # remove target return inputs class TestStackedRNN(ModuleApiTest): def setUp(self): shape = (2, 4, 16) self.inputs = [np.random.random(shape).astype("float32")] self.outputs = None self.attrs = {"input_size": 16, "hidden_size": 16, "num_layers": 2} self.param_states = {} @staticmethod def model_init(model, input_size, hidden_size, num_layers): cells = [ BasicLSTMCell(input_size, hidden_size), BasicLSTMCell(hidden_size, hidden_size) ] stacked_cell = StackedRNNCell(cells) model.lstm = RNN(stacked_cell) @staticmethod def model_forward(self, inputs): return self.lstm(inputs)[0] def make_inputs(self): inputs = [ Input([None, None, self.inputs[-1].shape[-1]], "float32", "input"), ] return inputs def test_check_output(self): self.check_output() class TestLSTM(ModuleApiTest): def setUp(self): shape = (2, 4, 16) self.inputs = [np.random.random(shape).astype("float32")] self.outputs = None self.attrs = {"input_size": 16, "hidden_size": 16, "num_layers": 2} self.param_states = {} @staticmethod def model_init(model, input_size, hidden_size, num_layers): model.lstm = LSTM(input_size, hidden_size, num_layers=num_layers) @staticmethod def model_forward(model, inputs): return model.lstm(inputs)[0] def make_inputs(self): inputs = [ Input([None, None, self.inputs[-1].shape[-1]], "float32", "input"), ] return inputs def test_check_output(self): self.check_output() class TestBiLSTM(ModuleApiTest): def setUp(self): shape = (2, 4, 16) self.inputs = [np.random.random(shape).astype("float32")] self.outputs = None self.attrs = {"input_size": 16, "hidden_size": 16, "num_layers": 2} self.param_states = {} @staticmethod def model_init(model, input_size, hidden_size, num_layers, merge_mode="concat", merge_each_layer=False): model.bilstm = BidirectionalLSTM( input_size, hidden_size, num_layers=num_layers, merge_mode=merge_mode, merge_each_layer=merge_each_layer) @staticmethod def model_forward(model, inputs): return model.bilstm(inputs)[0] def make_inputs(self): inputs = [ Input([None, None, self.inputs[-1].shape[-1]], "float32", "input"), ] return inputs def test_check_output_merge0(self): self.check_output() def test_check_output_merge1(self): self.attrs["merge_each_layer"] = True self.check_output() class TestGRU(ModuleApiTest): def setUp(self): shape = (2, 4, 64) self.inputs = [np.random.random(shape).astype("float32")] self.outputs = None self.attrs = {"input_size": 64, "hidden_size": 128, "num_layers": 2} self.param_states = {} @staticmethod def model_init(model, input_size, hidden_size, num_layers): model.gru = GRU(input_size, hidden_size, num_layers=num_layers) @staticmethod def model_forward(model, inputs): return model.gru(inputs)[0] def make_inputs(self): inputs = [ Input([None, None, self.inputs[-1].shape[-1]], "float32", "input"), ] return inputs def test_check_output(self): self.check_output() class TestBiGRU(ModuleApiTest): def setUp(self): shape = (2, 4, 64) self.inputs = [np.random.random(shape).astype("float32")] self.outputs = None self.attrs = {"input_size": 64, "hidden_size": 128, "num_layers": 2} self.param_states = {} @staticmethod def model_init(model, input_size, hidden_size, num_layers, merge_mode="concat", merge_each_layer=False): model.bigru = BidirectionalGRU( input_size, hidden_size, num_layers=num_layers, merge_mode=merge_mode, merge_each_layer=merge_each_layer) @staticmethod def model_forward(model, inputs): return model.bigru(inputs)[0] def make_inputs(self): inputs = [ Input([None, None, self.inputs[-1].shape[-1]], "float32", "input"), ] return inputs def test_check_output_merge0(self): self.check_output() def test_check_output_merge1(self): self.attrs["merge_each_layer"] = True self.check_output() class TestCNNEncoder(ModuleApiTest): def setUp(self): shape = (2, 32, 8) # [N, C, H] self.inputs = [np.random.random(shape).astype("float32")] self.outputs = None self.attrs = {"num_channels": 32, "num_filters": 64, "num_layers": 2} self.param_states = {} @staticmethod def model_init(model, num_channels, num_filters, num_layers): model.cnn_encoder = CNNEncoder( num_layers=2, num_channels=num_channels, num_filters=num_filters, filter_size=[2, 3], pool_size=[7, 6]) @staticmethod def model_forward(model, inputs): return model.cnn_encoder(inputs) def make_inputs(self): inputs = [ Input([None, self.inputs[-1].shape[1], None], "float32", "input"), ] return inputs def test_check_output(self): self.check_output() if __name__ == '__main__': unittest.main()