# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import unittest import numpy import numpy as np import paddle.fluid as fluid import paddle.fluid.core as core import paddle.fluid.layers as layers from paddle.fluid import framework from paddle.fluid.contrib.layers import basic_lstm from paddle.fluid.executor import Executor np.set_seed(123) SIGMOID_THRESHOLD_MIN = -40.0 SIGMOID_THRESHOLD_MAX = 13.0 EXP_MAX_INPUT = 40.0 def sigmoid(x): y = np.copy(x) y[x < SIGMOID_THRESHOLD_MIN] = SIGMOID_THRESHOLD_MIN y[x > SIGMOID_THRESHOLD_MAX] = SIGMOID_THRESHOLD_MAX return 1.0 / (1.0 + np.exp(-y)) def tanh(x): y = -2.0 * x y[y > EXP_MAX_INPUT] = EXP_MAX_INPUT return (2.0 / (1.0 + np.exp(y))) - 1.0 def lstm_np( input, init_h, init_c, hidden_size, gate_weight, gate_bias, num_layers=1, batch_first=False, is_bidirect=False, sequence_length=None, forget_bias=1.0, ): def step(step_in, pre_hidden, pre_cell, gate_w, gate_b): concat_1 = np.concatenate([step_in, pre_hidden], 1) gate_input = np.matmul(concat_1, gate_w) gate_input += gate_b i, j, f, o = np.split(gate_input, indices_or_sections=4, axis=1) new_cell = pre_cell * sigmoid(f + forget_bias) + sigmoid(i) * tanh(j) new_hidden = tanh(new_cell) * sigmoid(o) return new_hidden, new_cell mask = None if batch_first: input = np.tranpose(input, [1, 0, 2]) if mask is not None: mask = np.transpose(mask, [1, 0]) batch_size = input.shape[1] if sequence_length is not None: max_seq_len = input.shape[0] mask = np.zeros([batch_size, max_seq_len]) for i, len in enumerate(sequence_length): mask[i, :len] = 1.0 mask = np.transpose(mask, [1, 0]) direc_num = 1 if is_bidirect: direc_num = 2 if init_h: init_h = np.reshape(init_h, [num_layers, direc_num, -1, hidden_size]) init_c = np.reshape(init_c, [num_layers, direc_num, -1, hidden_size]) else: init_h = np.zeros([num_layers, direc_num, batch_size, hidden_size]) init_c = np.zeros([num_layers, direc_num, batch_size, hidden_size]) def get_single_direction_output(rnn_input, mask=None, direc_index=0): seq_len = rnn_input.shape[0] output = [] # init pre hidden pre_hidden_array = [] pre_cell_array = [] for i in range(num_layers): pre_hidden_array.append(init_h[i, direc_index]) pre_cell_array.append(init_c[i, direc_index]) for i in range(seq_len): step_input = rnn_input[i] if mask is not None: step_mask = mask[i] step_mask = np.reshape(step_mask, [-1, 1]) # print("np mask", step_mask.shape ) for i in range(num_layers): new_hidden, new_cell = step( step_input, pre_hidden_array[i], pre_cell_array[i], gate_weight[direc_index * num_layers + i], gate_bias[direc_index * num_layers + i], ) if mask is not None: new_hidden = np.multiply( new_hidden, step_mask ) - np.multiply(pre_hidden_array[i], (step_mask - 1.0)) # new_hidden = new_hidden * step_mask - pre_hidden_array[i] * ( step_mask -1 ) # new_cell = new_cell * step_mask - pre_cell_array[i] * (step_mask -1) new_cell = np.multiply(new_cell, step_mask) - np.multiply( pre_cell_array[i], (step_mask - 1.0) ) pre_hidden_array[i] = new_hidden pre_cell_array[i] = new_cell step_input = new_hidden output.append(step_input) rnn_out = np.concatenate(output, 0) rnn_out = np.reshape(rnn_out, [seq_len, -1, hidden_size]) last_hidden_out = np.concatenate(pre_hidden_array, 0) last_hidden_out = np.reshape( last_hidden_out, [num_layers, -1, hidden_size] ) last_cell_out = np.concatenate(pre_cell_array, 0) last_cell_out = np.reshape(last_cell_out, [num_layers, -1, hidden_size]) return rnn_out, last_hidden_out, last_cell_out fw_rnn_out, fw_last_hidden, fw_last_cell = get_single_direction_output( input, mask, direc_index=0 ) if is_bidirect: bw_input = input[::-1] bw_mask = None if mask is not None: bw_mask = mask[::-1] bw_rnn_out, bw_last_hidden, bw_last_cell = get_single_direction_output( bw_input, bw_mask, direc_index=1 ) bw_rnn_out = bw_rnn_out[::-1] rnn_out = np.concatenate([fw_rnn_out, bw_rnn_out], 2) last_hidden = np.concatenate([fw_last_hidden, bw_last_hidden], 1) last_hidden = np.reshape( last_hidden, [num_layers * direc_num, -1, hidden_size] ) last_cell = np.concatenate([fw_last_cell, bw_last_cell], 1) last_cell = np.reshape( last_cell, [num_layers * direc_num, -1, hidden_size] ) if batch_first: rnn_out = np.transpose(rnn_out, [1, 0, 2]) return rnn_out, last_hidden, last_cell else: rnn_out = fw_rnn_out last_hidden = fw_last_hidden last_cell = fw_last_cell if batch_first: rnn_out = np.transpose(rnn_out, [1, 0, 2]) return rnn_out, last_hidden, last_cell class TestBasicLSTMApi(unittest.TestCase): def setUp(self): self.hidden_size = 10 self.batch_size = 5 self.seq_len = 6 self.num_layers = 2 self.is_bidirect = True self.batch_first = False self.forget_bias = 1.0 def test_run(self): x = layers.data( name='x', shape=[-1, self.batch_size, self.hidden_size], dtype='float32', ) sequence_length = layers.data( name="sequence_length", shape=[-1], dtype='float32' ) rnn_out, last_hidden, last_cell = basic_lstm( x, None, None, self.hidden_size, num_layers=self.num_layers, batch_first=self.batch_first, bidirectional=self.is_bidirect, sequence_length=sequence_length, forget_bias=self.forget_bias, ) last_hidden.persisbale = True rnn_out.persisbale = True if core.is_compiled_with_cuda(): place = core.CUDAPlace(0) else: place = core.CPUPlace() exe = Executor(place) exe.run(framework.default_startup_program()) param_list = fluid.default_main_program().block(0).all_parameters() # process weight and bias gate_weight = [] gate_bias = [] for i in range(self.num_layers): gate_w_name = "basic_lstm_layers_" + str(i) + "/BasicLSTMUnit_0.w_0" gate_b_name = "basic_lstm_layers_" + str(i) + "/BasicLSTMUnit_0.b_0" gate_w = np.array( fluid.global_scope().find_var(gate_w_name).get_tensor() ) gate_w = np.random.uniform(-0.1, 0.1, size=gate_w.shape).astype( 'float32' ) fluid.global_scope().find_var(gate_w_name).get_tensor().set( gate_w, place ) gate_b = np.array( fluid.global_scope().find_var(gate_b_name).get_tensor() ) gate_b = np.random.uniform(-0.1, 0.1, size=gate_b.shape).astype( 'float32' ) fluid.global_scope().find_var(gate_b_name).get_tensor().set( gate_b, place ) gate_weight.append(gate_w) gate_bias.append(gate_b) if self.is_bidirect: for i in range(self.num_layers): gate_w_name = ( "basic_lstm_reverse_layers_" + str(i) + "/BasicLSTMUnit_0.w_0" ) gate_b_name = ( "basic_lstm_reverse_layers_" + str(i) + "/BasicLSTMUnit_0.b_0" ) gate_w = np.array( fluid.global_scope().find_var(gate_w_name).get_tensor() ) gate_w = np.random.uniform(-0.1, 0.1, size=gate_w.shape).astype( 'float32' ) fluid.global_scope().find_var(gate_w_name).get_tensor().set( gate_w, place ) gate_b = np.array( fluid.global_scope().find_var(gate_b_name).get_tensor() ) gate_b = np.random.uniform(-0.1, 0.1, size=gate_b.shape).astype( 'float32' ) fluid.global_scope().find_var(gate_b_name).get_tensor().set( gate_b, place ) gate_weight.append(gate_w) gate_bias.append(gate_b) step_input_np = np.random.uniform( -0.1, 0.1, (self.seq_len, self.batch_size, self.hidden_size) ).astype('float32') sequence_length_np = np.random.randint( self.seq_len // 2, self.seq_len, size=(self.batch_size) ).astype('int64') out = exe.run( feed={'x': step_input_np, 'sequence_length': sequence_length_np}, fetch_list=[rnn_out, last_hidden, last_cell], ) api_rnn_out = out[0] api_last_hidden = out[1] api_last_cell = out[2] np_out = lstm_np( step_input_np, None, None, self.hidden_size, gate_weight, gate_bias, num_layers=self.num_layers, batch_first=self.batch_first, is_bidirect=self.is_bidirect, sequence_length=sequence_length_np, ) np.testing.assert_allclose(api_rnn_out, np_out[0], rtol=0.0001, atol=0) np.testing.assert_allclose( api_last_hidden, np_out[1], rtol=0.0001, atol=0 ) np.testing.assert_allclose( api_last_cell, np_out[2], rtol=0.0001, atol=0 ) if __name__ == '__main__': unittest.main()