# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import print_function import unittest import numpy import paddle.fluid as fluid import paddle.fluid.layers as layers import paddle.fluid.core as core from paddle.fluid.framework import program_guard, Program from paddle.fluid.executor import Executor from paddle.fluid import framework from paddle.fluid.layers.rnn import LSTMCell, GRUCell, RNNCell from paddle.fluid.layers import rnn as dynamic_rnn from paddle.fluid import contrib from paddle.fluid.contrib.layers import basic_lstm import paddle.fluid.layers.utils as utils import numpy as np class TestLSTMCellError(unittest.TestCase): def test_errors(self): with program_guard(Program(), Program()): batch_size, input_size, hidden_size = 4, 16, 16 inputs = fluid.data( name='inputs', shape=[None, input_size], dtype='float32') pre_hidden = fluid.data( name='pre_hidden', shape=[None, hidden_size], dtype='float32') pre_cell = fluid.data( name='pre_cell', shape=[None, hidden_size], dtype='float32') cell = LSTMCell(hidden_size) def test_input_Variable(): np_input = np.random.random( (batch_size, input_size)).astype("float32") cell(np_input, [pre_hidden, pre_cell]) self.assertRaises(TypeError, test_input_Variable) def test_pre_hidden_Variable(): np_pre_hidden = np.random.random( (batch_size, hidden_size)).astype("float32") cell(inputs, [np_pre_hidden, pre_cell]) self.assertRaises(TypeError, test_pre_hidden_Variable) def test_pre_cell_Variable(): np_pre_cell = np.random.random( (batch_size, input_size)).astype("float32") cell(inputs, [pre_hidden, np_pre_cell]) self.assertRaises(TypeError, test_pre_cell_Variable) def test_input_type(): error_inputs = fluid.data( name='error_inputs', shape=[None, input_size], dtype='int32') cell(error_inputs, [pre_hidden, pre_cell]) self.assertRaises(TypeError, test_input_type) def test_pre_hidden_type(): error_pre_hidden = fluid.data( name='error_pre_hidden', shape=[None, hidden_size], dtype='int32') cell(inputs, [error_pre_hidden, pre_cell]) self.assertRaises(TypeError, test_pre_hidden_type) def test_pre_cell_type(): error_pre_cell = fluid.data( name='error_pre_cell', shape=[None, hidden_size], dtype='int32') cell(inputs, [pre_hidden, error_pre_cell]) self.assertRaises(TypeError, test_pre_cell_type) def test_dtype(): # the input type must be Variable LSTMCell(hidden_size, dtype="int32") self.assertRaises(TypeError, test_dtype) class TestLSTMCell(unittest.TestCase): def setUp(self): self.batch_size = 4 self.input_size = 16 self.hidden_size = 16 def test_run(self): inputs = fluid.data( name='inputs', shape=[None, self.input_size], dtype='float32') pre_hidden = fluid.data( name='pre_hidden', shape=[None, self.hidden_size], dtype='float32') pre_cell = fluid.data( name='pre_cell', shape=[None, self.hidden_size], dtype='float32') cell = LSTMCell(self.hidden_size) lstm_hidden_new, lstm_states_new = cell(inputs, [pre_hidden, pre_cell]) lstm_unit = contrib.layers.rnn_impl.BasicLSTMUnit( "basicLSTM", self.hidden_size, None, None, None, None, 1.0, "float32") lstm_hidden, lstm_cell = lstm_unit(inputs, pre_hidden, pre_cell) if core.is_compiled_with_cuda(): place = core.CUDAPlace(0) else: place = core.CPUPlace() exe = Executor(place) exe.run(framework.default_startup_program()) inputs_np = np.random.uniform( -0.1, 0.1, (self.batch_size, self.input_size)).astype('float32') pre_hidden_np = np.random.uniform( -0.1, 0.1, (self.batch_size, self.hidden_size)).astype('float32') pre_cell_np = np.random.uniform( -0.1, 0.1, (self.batch_size, self.hidden_size)).astype('float32') param_names = [[ "LSTMCell/BasicLSTMUnit_0.w_0", "basicLSTM/BasicLSTMUnit_0.w_0" ], ["LSTMCell/BasicLSTMUnit_0.b_0", "basicLSTM/BasicLSTMUnit_0.b_0"]] for names in param_names: param = np.array(fluid.global_scope().find_var(names[0]).get_tensor( )) param = np.random.uniform( -0.1, 0.1, size=param.shape).astype('float32') fluid.global_scope().find_var(names[0]).get_tensor().set(param, place) fluid.global_scope().find_var(names[1]).get_tensor().set(param, place) out = exe.run(feed={ 'inputs': inputs_np, 'pre_hidden': pre_hidden_np, 'pre_cell': pre_cell_np }, fetch_list=[lstm_hidden_new, lstm_hidden]) self.assertTrue(np.allclose(out[0], out[1], rtol=1e-4, atol=0)) class TestGRUCellError(unittest.TestCase): def test_errors(self): with program_guard(Program(), Program()): batch_size, input_size, hidden_size = 4, 16, 16 inputs = fluid.data( name='inputs', shape=[None, input_size], dtype='float32') pre_hidden = layers.data( name='pre_hidden', shape=[None, hidden_size], append_batch_size=False, dtype='float32') cell = GRUCell(hidden_size) def test_input_Variable(): np_input = np.random.random( (batch_size, input_size)).astype("float32") cell(np_input, pre_hidden) self.assertRaises(TypeError, test_input_Variable) def test_pre_hidden_Variable(): np_pre_hidden = np.random.random( (batch_size, hidden_size)).astype("float32") cell(inputs, np_pre_hidden) self.assertRaises(TypeError, test_pre_hidden_Variable) def test_input_type(): error_inputs = fluid.data( name='error_inputs', shape=[None, input_size], dtype='int32') cell(error_inputs, pre_hidden) self.assertRaises(TypeError, test_input_type) def test_pre_hidden_type(): error_pre_hidden = fluid.data( name='error_pre_hidden', shape=[None, hidden_size], dtype='int32') cell(inputs, error_pre_hidden) self.assertRaises(TypeError, test_pre_hidden_type) def test_dtype(): # the input type must be Variable GRUCell(hidden_size, dtype="int32") self.assertRaises(TypeError, test_dtype) class TestGRUCell(unittest.TestCase): def setUp(self): self.batch_size = 4 self.input_size = 16 self.hidden_size = 16 def test_run(self): inputs = fluid.data( name='inputs', shape=[None, self.input_size], dtype='float32') pre_hidden = layers.data( name='pre_hidden', shape=[None, self.hidden_size], append_batch_size=False, dtype='float32') cell = GRUCell(self.hidden_size) gru_hidden_new, _ = cell(inputs, pre_hidden) gru_unit = contrib.layers.rnn_impl.BasicGRUUnit( "basicGRU", self.hidden_size, None, None, None, None, "float32") gru_hidden = gru_unit(inputs, pre_hidden) if core.is_compiled_with_cuda(): place = core.CUDAPlace(0) else: place = core.CPUPlace() exe = Executor(place) exe.run(framework.default_startup_program()) inputs_np = np.random.uniform( -0.1, 0.1, (self.batch_size, self.input_size)).astype('float32') pre_hidden_np = np.random.uniform( -0.1, 0.1, (self.batch_size, self.hidden_size)).astype('float32') param_names = [ ["GRUCell/BasicGRUUnit_0.w_0", "basicGRU/BasicGRUUnit_0.w_0"], ["GRUCell/BasicGRUUnit_0.w_1", "basicGRU/BasicGRUUnit_0.w_1"], ["GRUCell/BasicGRUUnit_0.b_0", "basicGRU/BasicGRUUnit_0.b_0"], ["GRUCell/BasicGRUUnit_0.b_1", "basicGRU/BasicGRUUnit_0.b_1"] ] for names in param_names: param = np.array(fluid.global_scope().find_var(names[0]).get_tensor( )) param = np.random.uniform( -0.1, 0.1, size=param.shape).astype('float32') fluid.global_scope().find_var(names[0]).get_tensor().set(param, place) fluid.global_scope().find_var(names[1]).get_tensor().set(param, place) out = exe.run(feed={'inputs': inputs_np, 'pre_hidden': pre_hidden_np}, fetch_list=[gru_hidden_new, gru_hidden]) self.assertTrue(np.allclose(out[0], out[1], rtol=1e-4, atol=0)) class TestRnnError(unittest.TestCase): def test_errors(self): with program_guard(Program(), Program()): batch_size = 4 input_size = 16 hidden_size = 16 seq_len = 4 inputs = fluid.data( name='inputs', shape=[None, input_size], dtype='float32') pre_hidden = layers.data( name='pre_hidden', shape=[None, hidden_size], append_batch_size=False, dtype='float32') inputs_basic_lstm = fluid.data( name='inputs_basic_lstm', shape=[None, None, input_size], dtype='float32') sequence_length = fluid.data( name="sequence_length", shape=[None], dtype='int64') inputs_dynamic_rnn = layers.transpose( inputs_basic_lstm, perm=[1, 0, 2]) cell = LSTMCell(hidden_size, name="LSTMCell_for_rnn") np_inputs_dynamic_rnn = np.random.random( (seq_len, batch_size, input_size)).astype("float32") def test_input_Variable(): dynamic_rnn( cell=cell, inputs=np_inputs_dynamic_rnn, sequence_length=sequence_length, is_reverse=False) self.assertRaises(TypeError, test_input_Variable) def test_input_list(): dynamic_rnn( cell=cell, inputs=[np_inputs_dynamic_rnn], sequence_length=sequence_length, is_reverse=False) self.assertRaises(TypeError, test_input_list) def test_initial_states_type(): cell = GRUCell(hidden_size, name="GRUCell_for_rnn") error_initial_states = np.random.random( (batch_size, hidden_size)).astype("float32") dynamic_rnn( cell=cell, inputs=inputs_dynamic_rnn, initial_states=error_initial_states, sequence_length=sequence_length, is_reverse=False) self.assertRaises(TypeError, test_initial_states_type) def test_initial_states_list(): error_initial_states = [ np.random.random( (batch_size, hidden_size)).astype("float32"), np.random.random( (batch_size, hidden_size)).astype("float32") ] dynamic_rnn( cell=cell, inputs=inputs_dynamic_rnn, initial_states=error_initial_states, sequence_length=sequence_length, is_reverse=False) self.assertRaises(TypeError, test_initial_states_type) def test_sequence_length_type(): np_sequence_length = np.random.random( (batch_size)).astype("float32") dynamic_rnn( cell=cell, inputs=inputs_dynamic_rnn, sequence_length=np_sequence_length, is_reverse=False) self.assertRaises(TypeError, test_sequence_length_type) class TestRnn(unittest.TestCase): def setUp(self): self.batch_size = 4 self.input_size = 16 self.hidden_size = 16 self.seq_len = 4 def test_run(self): inputs_basic_lstm = fluid.data( name='inputs_basic_lstm', shape=[None, None, self.input_size], dtype='float32') sequence_length = fluid.data( name="sequence_length", shape=[None], dtype='int64') inputs_dynamic_rnn = layers.transpose(inputs_basic_lstm, perm=[1, 0, 2]) cell = LSTMCell(self.hidden_size, name="LSTMCell_for_rnn") output, final_state = dynamic_rnn( cell=cell, inputs=inputs_dynamic_rnn, sequence_length=sequence_length, is_reverse=False) output_new = layers.transpose(output, perm=[1, 0, 2]) rnn_out, last_hidden, last_cell = basic_lstm(inputs_basic_lstm, None, None, self.hidden_size, num_layers=1, \ batch_first = False, bidirectional=False, sequence_length=sequence_length, forget_bias = 1.0) if core.is_compiled_with_cuda(): place = core.CUDAPlace(0) else: place = core.CPUPlace() exe = Executor(place) exe.run(framework.default_startup_program()) inputs_basic_lstm_np = np.random.uniform( -0.1, 0.1, (self.seq_len, self.batch_size, self.input_size)).astype('float32') sequence_length_np = np.ones( self.batch_size, dtype='int64') * self.seq_len inputs_np = np.random.uniform( -0.1, 0.1, (self.batch_size, self.input_size)).astype('float32') pre_hidden_np = np.random.uniform( -0.1, 0.1, (self.batch_size, self.hidden_size)).astype('float32') pre_cell_np = np.random.uniform( -0.1, 0.1, (self.batch_size, self.hidden_size)).astype('float32') param_names = [[ "LSTMCell_for_rnn/BasicLSTMUnit_0.w_0", "basic_lstm_layers_0/BasicLSTMUnit_0.w_0" ], [ "LSTMCell_for_rnn/BasicLSTMUnit_0.b_0", "basic_lstm_layers_0/BasicLSTMUnit_0.b_0" ]] for names in param_names: param = np.array(fluid.global_scope().find_var(names[0]).get_tensor( )) param = np.random.uniform( -0.1, 0.1, size=param.shape).astype('float32') fluid.global_scope().find_var(names[0]).get_tensor().set(param, place) fluid.global_scope().find_var(names[1]).get_tensor().set(param, place) out = exe.run(feed={ 'inputs_basic_lstm': inputs_basic_lstm_np, 'sequence_length': sequence_length_np, 'inputs': inputs_np, 'pre_hidden': pre_hidden_np, 'pre_cell': pre_cell_np }, fetch_list=[output_new, rnn_out]) self.assertTrue(np.allclose(out[0], out[1], rtol=1e-4)) class TestRnnUtil(unittest.TestCase): """ Test cases for rnn apis' utility methods for coverage. """ def test_case(self): inputs = {"key1": 1, "key2": 2} func = lambda x: x + 1 outputs = utils.map_structure(func, inputs) utils.assert_same_structure(inputs, outputs) try: inputs["key3"] = 3 utils.assert_same_structure(inputs, outputs) except ValueError as identifier: pass class EncoderCell(RNNCell): """Encoder Cell""" def __init__( self, num_layers, hidden_size, dropout_prob=0., init_scale=0.1, ): self.num_layers = num_layers self.hidden_size = hidden_size self.dropout_prob = dropout_prob self.lstm_cells = [] for i in range(num_layers): self.lstm_cells.append(LSTMCell(hidden_size)) def call(self, step_input, states): new_states = [] for i in range(self.num_layers): out, new_state = self.lstm_cells[i](step_input, states[i]) step_input = layers.dropout( out, self.dropout_prob, ) if self.dropout_prob else out new_states.append(new_state) return step_input, new_states @property def state_shape(self): return [cell.state_shape for cell in self.lstm_cells] class DecoderCell(RNNCell): """Decoder Cell""" def __init__(self, num_layers, hidden_size, dropout_prob=0.): self.num_layers = num_layers self.hidden_size = hidden_size self.dropout_prob = dropout_prob self.lstm_cells = [] for i in range(num_layers): self.lstm_cells.append(LSTMCell(hidden_size)) def call(self, step_input, states): new_lstm_states = [] for i in range(self.num_layers): out, new_lstm_state = self.lstm_cells[i](step_input, states[i]) step_input = layers.dropout( out, self.dropout_prob, ) if self.dropout_prob else out new_lstm_states.append(new_lstm_state) return step_input, new_lstm_states def def_seq2seq_model(num_layers, hidden_size, dropout_prob, src_vocab_size, trg_vocab_size): "vanilla seq2seq model" # data source = fluid.data(name="src", shape=[None, None], dtype="int64") source_length = fluid.data( name="src_sequence_length", shape=[None], dtype="int64") target = fluid.data(name="trg", shape=[None, None], dtype="int64") target_length = fluid.data( name="trg_sequence_length", shape=[None], dtype="int64") label = fluid.data(name="label", shape=[None, None, 1], dtype="int64") # embedding src_emb = fluid.embedding(source, (src_vocab_size, hidden_size)) tar_emb = fluid.embedding(target, (src_vocab_size, hidden_size)) # encoder enc_cell = EncoderCell(num_layers, hidden_size, dropout_prob) enc_output, enc_final_state = dynamic_rnn( cell=enc_cell, inputs=src_emb, sequence_length=source_length) # decoder dec_cell = DecoderCell(num_layers, hidden_size, dropout_prob) dec_output, dec_final_state = dynamic_rnn( cell=dec_cell, inputs=tar_emb, initial_states=enc_final_state) logits = layers.fc(dec_output, size=trg_vocab_size, num_flatten_dims=len(dec_output.shape) - 1, bias_attr=False) # loss loss = layers.softmax_with_cross_entropy( logits=logits, label=label, soft_label=False) loss = layers.unsqueeze(loss, axes=[2]) max_tar_seq_len = layers.shape(target)[1] tar_mask = layers.sequence_mask( target_length, maxlen=max_tar_seq_len, dtype="float32") loss = loss * tar_mask loss = layers.reduce_mean(loss, dim=[0]) loss = layers.reduce_sum(loss) # optimizer optimizer = fluid.optimizer.Adam(0.001) optimizer.minimize(loss) return loss class TestSeq2SeqModel(unittest.TestCase): """ Test cases to confirm seq2seq api training correctly. """ def setUp(self): np.random.seed(123) self.model_hparams = { "num_layers": 2, "hidden_size": 128, "dropout_prob": 0.1, "src_vocab_size": 100, "trg_vocab_size": 100 } self.iter_num = iter_num = 2 self.batch_size = batch_size = 4 src_seq_len = 10 trg_seq_len = 12 self.data = { "src": np.random.randint( 2, self.model_hparams["src_vocab_size"], (iter_num * batch_size, src_seq_len)).astype("int64"), "src_sequence_length": np.random.randint( 1, src_seq_len, (iter_num * batch_size, )).astype("int64"), "trg": np.random.randint( 2, self.model_hparams["src_vocab_size"], (iter_num * batch_size, trg_seq_len)).astype("int64"), "trg_sequence_length": np.random.randint( 1, trg_seq_len, (iter_num * batch_size, )).astype("int64"), "label": np.random.randint( 2, self.model_hparams["src_vocab_size"], (iter_num * batch_size, trg_seq_len, 1)).astype("int64"), } place = core.CUDAPlace(0) if core.is_compiled_with_cuda( ) else core.CPUPlace() self.exe = Executor(place) def test_seq2seq_model(self): main_program = fluid.Program() startup_program = fluid.Program() with fluid.program_guard(main_program, startup_program): cost = def_seq2seq_model(**self.model_hparams) self.exe.run(startup_program) for iter_idx in range(self.iter_num): cost_val = self.exe.run(feed={ "src": self.data["src"][iter_idx * self.batch_size:( iter_idx + 1) * self.batch_size, :], "src_sequence_length": self.data["src_sequence_length"] [iter_idx * self.batch_size:(iter_idx + 1) * self.batch_size], "trg": self.data["trg"][iter_idx * self.batch_size:( iter_idx + 1) * self.batch_size, :], "trg_sequence_length": self.data["trg_sequence_length"][ iter_idx * self.batch_size:(iter_idx + 1 ) * self.batch_size], "label": self.data["label"][iter_idx * self.batch_size:( iter_idx + 1) * self.batch_size] }, fetch_list=[cost])[0] print("iter_idx: %d, cost: %f" % (iter_idx, cost_val)) if __name__ == '__main__': unittest.main()