diff --git a/tensorflow/python/keras/layers/unified_lstm_test.py b/tensorflow/python/keras/layers/unified_lstm_test.py index 55ccebb43b43cc19b272d8dc5564d06b76fa1c37..c51304666d3f6b830c5a815db385921838ca9694 100644 --- a/tensorflow/python/keras/layers/unified_lstm_test.py +++ b/tensorflow/python/keras/layers/unified_lstm_test.py @@ -33,6 +33,7 @@ from tensorflow.python.eager import context from tensorflow.python.framework import constant_op from tensorflow.python.framework import dtypes from tensorflow.python.framework import test_util +from tensorflow.python.keras import keras_parameterized from tensorflow.python.keras import testing_utils from tensorflow.python.ops import array_ops from tensorflow.python.ops import control_flow_ops @@ -54,9 +55,252 @@ _graph_options = config_pb2.GraphOptions(rewrite_options=_rewrites) _config = config_pb2.ConfigProto(graph_options=_graph_options) -@test_util.run_v1_only('b/120545219') -class UnifiedLSTMTest(test.TestCase, parameterized.TestCase): +@keras_parameterized.run_all_keras_modes(config=_config) +class UnifiedLSTMTest(keras_parameterized.TestCase): + @parameterized.named_parameters( + ('non_tan_activation', 'relu', 'sigmoid', 0, False, True), + ('non_sigmoid_recur_activation', 'tanh', 'relu', 0, False, True), + ('use_recurrent_dropout', 'tanh', 'sigmoid', 0.1, False, True), + ('unroll', 'tanh', 'sigmoid', 0, True, True), + ('not_use_bias', 'tanh', 'sigmoid', 0, False, False), + ) + def test_could_use_defun_backend(self, activation, recurrent_activation, + recurrent_dropout, unroll, use_bias): + layer = keras.layers.UnifiedLSTM( + 1, + activation=activation, + recurrent_activation=recurrent_activation, + recurrent_dropout=recurrent_dropout, + unroll=unroll, + use_bias=use_bias) + self.assertFalse(layer.could_use_cudnn) + + def test_static_shape_inference_LSTM(self): + # Github issue: 15165 + timesteps = 3 + embedding_dim = 4 + units = 2 + + model = keras.models.Sequential() + inputs = keras.layers.Dense( + embedding_dim, input_shape=(timesteps, embedding_dim)) + model.add(inputs) + layer = keras.layers.UnifiedLSTM(units, return_sequences=True) + model.add(layer) + outputs = model.layers[-1].output + self.assertEqual(outputs.get_shape().as_list(), [None, timesteps, units]) + + def test_dynamic_behavior_LSTM(self): + num_samples = 2 + timesteps = 3 + embedding_dim = 4 + units = 2 + layer = keras.layers.UnifiedLSTM(units, input_shape=(None, embedding_dim)) + model = keras.models.Sequential() + model.add(layer) + model.compile(gradient_descent.GradientDescentOptimizer(0.001), 'mse') + x = np.random.random((num_samples, timesteps, embedding_dim)) + y = np.random.random((num_samples, units)) + model.train_on_batch(x, y) + + def test_stacking_LSTM(self): + inputs = np.random.random((2, 3, 4)) + targets = np.abs(np.random.random((2, 3, 5))) + targets /= targets.sum(axis=-1, keepdims=True) + model = keras.models.Sequential() + model.add(keras.layers.UnifiedLSTM(10, return_sequences=True, unroll=False)) + model.add(keras.layers.UnifiedLSTM(5, return_sequences=True, unroll=False)) + model.compile( + loss='categorical_crossentropy', + optimizer=gradient_descent.GradientDescentOptimizer(0.01)) + model.fit(inputs, targets, epochs=1, batch_size=2, verbose=1) + + def test_from_config_LSTM(self): + layer_class = keras.layers.UnifiedLSTM + for stateful in (False, True): + l1 = layer_class(units=1, stateful=stateful) + l2 = layer_class.from_config(l1.get_config()) + assert l1.get_config() == l2.get_config() + + def test_specify_initial_state_keras_tensor(self): + num_states = 2 + timesteps = 3 + embedding_dim = 4 + units = 3 + num_samples = 2 + + # Test with Keras tensor + inputs = keras.Input((timesteps, embedding_dim)) + initial_state = [keras.Input((units,)) for _ in range(num_states)] + layer = keras.layers.UnifiedLSTM(units) + if len(initial_state) == 1: + output = layer(inputs, initial_state=initial_state[0]) + else: + output = layer(inputs, initial_state=initial_state) + assert initial_state[0] in layer._inbound_nodes[0].input_tensors + + model = keras.models.Model([inputs] + initial_state, output) + model.compile( + loss='categorical_crossentropy', + optimizer=gradient_descent.GradientDescentOptimizer(0.01)) + + inputs = np.random.random((num_samples, timesteps, embedding_dim)) + initial_state = [ + np.random.random((num_samples, units)) for _ in range(num_states) + ] + targets = np.random.random((num_samples, units)) + model.train_on_batch([inputs] + initial_state, targets) + + def DISABLED_test_specify_initial_state_non_keras_tensor(self): + num_states = 2 + timesteps = 3 + embedding_dim = 4 + units = 3 + num_samples = 2 + + # Test with non-Keras tensor + inputs = keras.Input((timesteps, embedding_dim)) + initial_state = [ + keras.backend.random_normal_variable((num_samples, units), 0, 1) + for _ in range(num_states) + ] + layer = keras.layers.UnifiedLSTM(units) + output = layer(inputs, initial_state=initial_state) + + model = keras.models.Model(inputs, output) + model.compile( + loss='categorical_crossentropy', + optimizer=gradient_descent.GradientDescentOptimizer(0.01)) + + inputs = np.random.random((num_samples, timesteps, embedding_dim)) + targets = np.random.random((num_samples, units)) + model.train_on_batch(inputs, targets) + + def test_reset_states_with_values(self): + num_states = 2 + timesteps = 3 + embedding_dim = 4 + units = 3 + num_samples = 2 + + layer = keras.layers.UnifiedLSTM(units, stateful=True) + layer.build((num_samples, timesteps, embedding_dim)) + layer.reset_states() + assert len(layer.states) == num_states + assert layer.states[0] is not None + self.assertAllClose( + keras.backend.eval(layer.states[0]), + np.zeros(keras.backend.int_shape(layer.states[0])), + atol=1e-4) + state_shapes = [keras.backend.int_shape(state) for state in layer.states] + values = [np.ones(shape) for shape in state_shapes] + if len(values) == 1: + values = values[0] + layer.reset_states(values) + self.assertAllClose( + keras.backend.eval(layer.states[0]), + np.ones(keras.backend.int_shape(layer.states[0])), + atol=1e-4) + + # Test with invalid data + with self.assertRaises(ValueError): + layer.reset_states([1] * (len(layer.states) + 1)) + + def test_specify_state_with_masking(self): + num_states = 2 + timesteps = 3 + embedding_dim = 4 + units = 3 + num_samples = 2 + + inputs = keras.Input((timesteps, embedding_dim)) + _ = keras.layers.Masking()(inputs) + initial_state = [keras.Input((units,)) for _ in range(num_states)] + output = keras.layers.UnifiedLSTM(units)( + inputs, initial_state=initial_state) + + model = keras.models.Model([inputs] + initial_state, output) + model.compile( + loss='categorical_crossentropy', + optimizer=gradient_descent.GradientDescentOptimizer(0.01)) + + inputs = np.random.random((num_samples, timesteps, embedding_dim)) + initial_state = [ + np.random.random((num_samples, units)) for _ in range(num_states) + ] + targets = np.random.random((num_samples, units)) + model.train_on_batch([inputs] + initial_state, targets) + + def test_return_state(self): + num_states = 2 + timesteps = 3 + embedding_dim = 4 + units = 3 + num_samples = 2 + + inputs = keras.Input(batch_shape=(num_samples, timesteps, embedding_dim)) + layer = keras.layers.UnifiedLSTM(units, return_state=True, stateful=True) + outputs = layer(inputs) + state = outputs[1:] + assert len(state) == num_states + model = keras.models.Model(inputs, state[0]) + + inputs = np.random.random((num_samples, timesteps, embedding_dim)) + state = model.predict(inputs) + self.assertAllClose(keras.backend.eval(layer.states[0]), state, atol=1e-4) + + def test_state_reuse(self): + timesteps = 3 + embedding_dim = 4 + units = 3 + num_samples = 2 + + inputs = keras.Input(batch_shape=(num_samples, timesteps, embedding_dim)) + layer = keras.layers.UnifiedLSTM( + units, return_state=True, return_sequences=True) + outputs = layer(inputs) + output, state = outputs[0], outputs[1:] + output = keras.layers.UnifiedLSTM(units)(output, initial_state=state) + model = keras.models.Model(inputs, output) + + inputs = np.random.random((num_samples, timesteps, embedding_dim)) + model.predict(inputs) + + def test_initial_states_as_other_inputs(self): + timesteps = 3 + embedding_dim = 4 + units = 3 + num_samples = 2 + num_states = 2 + layer_class = keras.layers.UnifiedLSTM + + # Test with Keras tensor + main_inputs = keras.Input((timesteps, embedding_dim)) + initial_state = [keras.Input((units,)) for _ in range(num_states)] + inputs = [main_inputs] + initial_state + + layer = layer_class(units) + output = layer(inputs) + assert initial_state[0] in layer._inbound_nodes[0].input_tensors + + model = keras.models.Model(inputs, output) + model.compile( + loss='categorical_crossentropy', + optimizer=gradient_descent.GradientDescentOptimizer(0.01)) + + main_inputs = np.random.random((num_samples, timesteps, embedding_dim)) + initial_state = [ + np.random.random((num_samples, units)) for _ in range(num_states) + ] + targets = np.random.random((num_samples, units)) + model.train_on_batch([main_inputs] + initial_state, targets) + + +class LSTMLayerGraphOnlyTest(test.TestCase): + + # Need session for test + @test_util.run_deprecated_v1 def test_unifiedLSTM(self): input_shape = 10 rnn_state_size = 8 @@ -101,6 +345,8 @@ class UnifiedLSTMTest(test.TestCase, parameterized.TestCase): self.assertNotEqual(existing_loss, loss_value) existing_loss = loss_value + # Need session for test + @test_util.run_deprecated_v1 def test_unifiedLSTM_with_cond(self): # This test is to demonstrate the graph rewrite of grappler plugin under # the condition that the function returns different number of internal @@ -158,25 +404,48 @@ class UnifiedLSTMTest(test.TestCase, parameterized.TestCase): self.assertNotEqual(existing_loss, loss_value) existing_loss = loss_value - @parameterized.named_parameters( - ('non_tan_activation', 'relu', 'sigmoid', 0, False, True), - ('non_sigmoid_recur_activation', 'tanh', 'relu', 0, False, True), - ('use_recurrent_dropout', 'tanh', 'sigmoid', 0.1, False, True), - ('unroll', 'tanh', 'sigmoid', 0, True, True), - ('not_use_bias', 'tanh', 'sigmoid', 0, False, False), - ) + # b/120919032 + @test_util.run_deprecated_v1 + def test_regularizers_LSTM(self): + embedding_dim = 4 + layer_class = keras.layers.UnifiedLSTM + layer = layer_class( + 5, + return_sequences=False, + weights=None, + input_shape=(None, embedding_dim), + kernel_regularizer=keras.regularizers.l1(0.01), + recurrent_regularizer=keras.regularizers.l1(0.01), + bias_regularizer='l2', + activity_regularizer='l1') + layer.build((None, None, 2)) + self.assertEqual(len(layer.losses), 3) + x = keras.backend.variable(np.ones((2, 3, 2))) + layer(x) + self.assertEqual(len(layer.get_losses_for(x)), 1) + + +# TODO(scottzhu): Re-enable those tests in v2 mode once bugs attached are fixed. +@test_util.run_v1_only +class LSTMLayerV1OnlyTest(test.TestCase, parameterized.TestCase): + + # b/120911602 @test_util.run_in_graph_and_eager_modes(config=_config) - def test_could_use_defun_backend(self, activation, recurrent_activation, - recurrent_dropout, unroll, use_bias): - layer = keras.layers.UnifiedLSTM( - 1, - activation=activation, - recurrent_activation=recurrent_activation, - recurrent_dropout=recurrent_dropout, - unroll=unroll, - use_bias=use_bias) - self.assertFalse(layer.could_use_cudnn) + def test_dropout_LSTM(self): + num_samples = 2 + timesteps = 3 + embedding_dim = 4 + units = 2 + testing_utils.layer_test( + keras.layers.UnifiedLSTM, + kwargs={ + 'units': units, + 'dropout': 0.1, + 'recurrent_dropout': 0.1 + }, + input_shape=(num_samples, timesteps, embedding_dim)) + # b/120911602 def test_unified_lstm_feature_parity_with_canonical_lstm(self): with context.eager_mode(): # Run this test under eager only due to b/120160788 for model.set_weights. @@ -216,85 +485,67 @@ class UnifiedLSTMTest(test.TestCase, parameterized.TestCase): self.assertAllClose(y_1, y_3) self.assertAllClose(y_2, y_4) - @parameterized.named_parameters( - # test_name, use_bias, bias_initializer, activation - ('normal', True, 'zeros'), - ('no_bias', False, 'zeros'), - ('random_bias', True, 'random_uniform'), - ) + # b/120911602 + @parameterized.named_parameters(('v0', 0), ('v1', 1), ('v2', 2)) @test_util.run_in_graph_and_eager_modes(config=_config) - def test_unified_lstm_model_save_load(self, use_bias, bias_initializer): - temp_dir = self.get_temp_dir() - self.addCleanup(shutil.rmtree, temp_dir) - h5_path = os.path.join(temp_dir, 'test.h5') - - batch = 10 - timestep = 3 - input_dim = 5 + def test_implementation_mode_LSTM(self, implementation_mode): + num_samples = 2 + timesteps = 3 + embedding_dim = 4 units = 2 + testing_utils.layer_test( + keras.layers.UnifiedLSTM, + kwargs={ + 'units': units, + 'implementation': implementation_mode + }, + input_shape=(num_samples, timesteps, embedding_dim)) - x = np.random.random((batch, timestep, input_dim)) - - def build_model(): - inputs = keras.layers.Input( - shape=[timestep, input_dim], dtype=dtypes.float32) - layer = keras.layers.UnifiedLSTM( - units, - use_bias=use_bias, - bias_initializer=bias_initializer) - output = layer(inputs) - return keras.models.Model(inputs, output), layer - - model, layer = build_model() - y_ref = model.predict(x) - model.save_weights(h5_path) - - cloned_model, new_layer = build_model() - cloned_model.load_weights(h5_path) - y = cloned_model.predict(x) + layer_class = keras.layers.UnifiedLSTM + k_constraint = keras.constraints.max_norm(0.01) + r_constraint = keras.constraints.max_norm(0.01) + b_constraint = keras.constraints.max_norm(0.01) + layer = layer_class( + 5, + return_sequences=False, + weights=None, + input_shape=(None, embedding_dim), + kernel_constraint=k_constraint, + recurrent_constraint=r_constraint, + bias_constraint=b_constraint) + layer.build((None, None, embedding_dim)) + self.assertEqual(layer.cell.kernel.constraint, k_constraint) + self.assertEqual(layer.cell.recurrent_kernel.constraint, r_constraint) + self.assertEqual(layer.cell.bias.constraint, b_constraint) - self.assertAllClose(y, y_ref) - self.assertAllClose(layer.get_weights(), new_layer.get_weights()) + layer_class = keras.layers.UnifiedLSTM + inputs = np.random.random((2, 3, 4)) + targets = np.abs(np.random.random((2, 3, 5))) + targets /= targets.sum(axis=-1, keepdims=True) + model = keras.models.Sequential() + model.add(keras.layers.Masking(input_shape=(3, 4))) + model.add(layer_class(units=5, return_sequences=True, unroll=False)) + model.compile( + loss='categorical_crossentropy', + optimizer=gradient_descent.GradientDescentOptimizer(0.01)) + model.fit(inputs, targets, epochs=1, batch_size=2, verbose=1) + # b/120911602 @test_util.run_in_graph_and_eager_modes(config=_config) - def test_unified_lstm_output_on_multiple_kernel(self): - input_shape = 10 - rnn_state_size = 8 - timestep = 4 - batch = 100 - - x_train = np.random.random((batch, timestep, input_shape)) - - inputs = keras.layers.Input( - shape=[timestep, input_shape], dtype=dtypes.float32) - with test_util.device(use_gpu=False): - layer = keras.layers.UnifiedLSTM(rnn_state_size) - output = layer(inputs) - cpu_model = keras.models.Model(inputs, output) - weights = cpu_model.get_weights() - y_1 = cpu_model.predict(x_train) - - with test_util.device(use_gpu=True): - layer = keras.layers.UnifiedLSTM(rnn_state_size) - output = layer(inputs) - gpu_model = keras.models.Model(inputs, output) - gpu_model.set_weights(weights) - y_2 = gpu_model.predict(x_train) - - # Note that CuDNN uses 'sigmoid' as activation, so the unified LSTM uses - # 'sigmoid' as default. Construct the canonical LSTM with sigmoid to achieve - # the same output. - with test_util.device(use_gpu=True): - layer = keras.layers.LSTM(rnn_state_size, recurrent_activation='sigmoid') - output = layer(inputs) - canonical_model = keras.models.Model(inputs, output) - # Remove the extra cudnn bias since canonical lstm will not use it. - canonical_model.set_weights(weights[:3]) - y_3 = canonical_model.predict(x_train) - - self.assertAllClose(y_1, y_2) - self.assertAllClose(y_2, y_3) + def test_masking_with_stacking_LSTM(self): + inputs = np.random.random((2, 3, 4)) + targets = np.abs(np.random.random((2, 3, 5))) + targets /= targets.sum(axis=-1, keepdims=True) + model = keras.models.Sequential() + model.add(keras.layers.Masking(input_shape=(3, 4))) + model.add(keras.layers.UnifiedLSTM(10, return_sequences=True, unroll=False)) + model.add(keras.layers.UnifiedLSTM(5, return_sequences=True, unroll=False)) + model.compile( + loss='categorical_crossentropy', + optimizer=gradient_descent.GradientDescentOptimizer(0.01)) + model.fit(inputs, targets, epochs=1, batch_size=2, verbose=1) + # b/120911602 @parameterized.named_parameters( # test_name, time_major, go_backwards ('normal', False, False), @@ -339,8 +590,6 @@ class UnifiedLSTMTest(test.TestCase, parameterized.TestCase): self.assertAllClose(y, y_ref) - @test_util.run_in_graph_and_eager_modes(config=_config) - def test_keras_model_with_lstm(self): input_shape = 10 rnn_state_size = 8 output_shape = 8 @@ -367,52 +616,89 @@ class UnifiedLSTMTest(test.TestCase, parameterized.TestCase): model.evaluate(x_train, y_train) model.predict(x_train) + # b/120911602 + @parameterized.named_parameters( + # test_name, use_bias, bias_initializer, activation + ('normal', True, 'zeros'), + ('no_bias', False, 'zeros'), + ('random_bias', True, 'random_uniform'), + ) @test_util.run_in_graph_and_eager_modes(config=_config) - def test_return_sequences_LSTM(self): - num_samples = 2 - timesteps = 3 - embedding_dim = 4 - units = 2 - testing_utils.layer_test( - keras.layers.UnifiedLSTM, - kwargs={ - 'units': units, - 'return_sequences': True - }, - input_shape=(num_samples, timesteps, embedding_dim)) + def test_unified_lstm_model_save_load(self, use_bias, bias_initializer): + temp_dir = self.get_temp_dir() + self.addCleanup(shutil.rmtree, temp_dir) + h5_path = os.path.join(temp_dir, 'test.h5') - @test_util.run_in_graph_and_eager_modes(config=_config) - def test_static_shape_inference_LSTM(self): - # Github issue: 15165 - timesteps = 3 - embedding_dim = 4 + batch = 10 + timestep = 3 + input_dim = 5 units = 2 - model = keras.models.Sequential() - inputs = keras.layers.Dense( - embedding_dim, input_shape=(timesteps, embedding_dim)) - model.add(inputs) - layer = keras.layers.UnifiedLSTM(units, return_sequences=True) - model.add(layer) - outputs = model.layers[-1].output - self.assertEqual(outputs.get_shape().as_list(), [None, timesteps, units]) + x = np.random.random((batch, timestep, input_dim)) + + def build_model(): + inputs = keras.layers.Input( + shape=[timestep, input_dim], dtype=dtypes.float32) + layer = keras.layers.UnifiedLSTM( + units, + use_bias=use_bias, + bias_initializer=bias_initializer) + output = layer(inputs) + return keras.models.Model(inputs, output), layer + model, layer = build_model() + y_ref = model.predict(x) + model.save_weights(h5_path) + + cloned_model, new_layer = build_model() + cloned_model.load_weights(h5_path) + y = cloned_model.predict(x) + + self.assertAllClose(y, y_ref) + self.assertAllClose(layer.get_weights(), new_layer.get_weights()) + + # b/120911602 @test_util.run_in_graph_and_eager_modes(config=_config) - def test_dynamic_behavior_LSTM(self): - num_samples = 2 - timesteps = 3 - embedding_dim = 4 - units = 2 - layer = keras.layers.UnifiedLSTM(units, input_shape=(None, embedding_dim)) - model = keras.models.Sequential() - model.add(layer) - model.compile(gradient_descent.GradientDescentOptimizer(0.001), 'mse') - x = np.random.random((num_samples, timesteps, embedding_dim)) - y = np.random.random((num_samples, units)) - model.train_on_batch(x, y) + def test_unified_lstm_output_on_multiple_kernel(self): + input_shape = 10 + rnn_state_size = 8 + timestep = 4 + batch = 100 + + x_train = np.random.random((batch, timestep, input_shape)) + + inputs = keras.layers.Input( + shape=[timestep, input_shape], dtype=dtypes.float32) + with test_util.device(use_gpu=False): + layer = keras.layers.UnifiedLSTM(rnn_state_size) + output = layer(inputs) + cpu_model = keras.models.Model(inputs, output) + weights = cpu_model.get_weights() + y_1 = cpu_model.predict(x_train) + + with test_util.device(use_gpu=True): + layer = keras.layers.UnifiedLSTM(rnn_state_size) + output = layer(inputs) + gpu_model = keras.models.Model(inputs, output) + gpu_model.set_weights(weights) + y_2 = gpu_model.predict(x_train) + + # Note that CuDNN uses 'sigmoid' as activation, so the unified LSTM uses + # 'sigmoid' as default. Construct the canonical LSTM with sigmoid to achieve + # the same output. + with test_util.device(use_gpu=True): + layer = keras.layers.LSTM(rnn_state_size, recurrent_activation='sigmoid') + output = layer(inputs) + canonical_model = keras.models.Model(inputs, output) + # Remove the extra cudnn bias since canonical lstm will not use it. + canonical_model.set_weights(weights[:3]) + y_3 = canonical_model.predict(x_train) + + self.assertAllClose(y_1, y_2) + self.assertAllClose(y_2, y_3) @test_util.run_in_graph_and_eager_modes(config=_config) - def test_dropout_LSTM(self): + def test_return_sequences_LSTM(self): num_samples = 2 timesteps = 3 embedding_dim = 4 @@ -421,360 +707,73 @@ class UnifiedLSTMTest(test.TestCase, parameterized.TestCase): keras.layers.UnifiedLSTM, kwargs={ 'units': units, - 'dropout': 0.1, - 'recurrent_dropout': 0.1 + 'return_sequences': True }, input_shape=(num_samples, timesteps, embedding_dim)) - @parameterized.parameters([0, 1, 2]) + # b/120911602 @test_util.run_in_graph_and_eager_modes(config=_config) - def test_implementation_mode_LSTM(self, implementation_mode): + def test_statefulness_LSTM(self): num_samples = 2 timesteps = 3 embedding_dim = 4 units = 2 - testing_utils.layer_test( - keras.layers.UnifiedLSTM, - kwargs={ - 'units': units, - 'implementation': implementation_mode - }, - input_shape=(num_samples, timesteps, embedding_dim)) - - @test_util.run_in_graph_and_eager_modes(config=_config) - def test_constraints_LSTM(self): - embedding_dim = 4 layer_class = keras.layers.UnifiedLSTM - k_constraint = keras.constraints.max_norm(0.01) - r_constraint = keras.constraints.max_norm(0.01) - b_constraint = keras.constraints.max_norm(0.01) - layer = layer_class( - 5, - return_sequences=False, - weights=None, - input_shape=(None, embedding_dim), - kernel_constraint=k_constraint, - recurrent_constraint=r_constraint, - bias_constraint=b_constraint) - layer.build((None, None, embedding_dim)) - self.assertEqual(layer.cell.kernel.constraint, k_constraint) - self.assertEqual(layer.cell.recurrent_kernel.constraint, r_constraint) - self.assertEqual(layer.cell.bias.constraint, b_constraint) - - @test_util.run_in_graph_and_eager_modes(config=_config) - def test_with_masking_layer_LSTM(self): - layer_class = keras.layers.UnifiedLSTM - inputs = np.random.random((2, 3, 4)) - targets = np.abs(np.random.random((2, 3, 5))) - targets /= targets.sum(axis=-1, keepdims=True) model = keras.models.Sequential() - model.add(keras.layers.Masking(input_shape=(3, 4))) - model.add(layer_class(units=5, return_sequences=True, unroll=False)) - model.compile( - loss='categorical_crossentropy', - optimizer=gradient_descent.GradientDescentOptimizer(0.01)) - model.fit(inputs, targets, epochs=1, batch_size=2, verbose=1) - - @test_util.run_in_graph_and_eager_modes(config=_config) - def test_stacking_LSTM(self): - inputs = np.random.random((2, 3, 4)) - targets = np.abs(np.random.random((2, 3, 5))) - targets /= targets.sum(axis=-1, keepdims=True) - model = keras.models.Sequential() - model.add(keras.layers.UnifiedLSTM(10, return_sequences=True, unroll=False)) - model.add(keras.layers.UnifiedLSTM(5, return_sequences=True, unroll=False)) - model.compile( - loss='categorical_crossentropy', - optimizer=gradient_descent.GradientDescentOptimizer(0.01)) - model.fit(inputs, targets, epochs=1, batch_size=2, verbose=1) - - @test_util.run_in_graph_and_eager_modes(config=_config) - def test_masking_with_stacking_LSTM(self): - inputs = np.random.random((2, 3, 4)) - targets = np.abs(np.random.random((2, 3, 5))) - targets /= targets.sum(axis=-1, keepdims=True) - model = keras.models.Sequential() - model.add(keras.layers.Masking(input_shape=(3, 4))) - model.add(keras.layers.UnifiedLSTM(10, return_sequences=True, unroll=False)) - model.add(keras.layers.UnifiedLSTM(5, return_sequences=True, unroll=False)) - model.compile( - loss='categorical_crossentropy', - optimizer=gradient_descent.GradientDescentOptimizer(0.01)) - model.fit(inputs, targets, epochs=1, batch_size=2, verbose=1) - - @test_util.run_in_graph_and_eager_modes(config=_config) - def test_from_config_LSTM(self): - layer_class = keras.layers.UnifiedLSTM - for stateful in (False, True): - l1 = layer_class(units=1, stateful=stateful) - l2 = layer_class.from_config(l1.get_config()) - assert l1.get_config() == l2.get_config() - - @test_util.run_in_graph_and_eager_modes(config=_config) - def test_specify_initial_state_keras_tensor(self): - num_states = 2 - timesteps = 3 - embedding_dim = 4 - units = 3 - num_samples = 2 - - # Test with Keras tensor - inputs = keras.Input((timesteps, embedding_dim)) - initial_state = [keras.Input((units,)) for _ in range(num_states)] - layer = keras.layers.UnifiedLSTM(units) - if len(initial_state) == 1: - output = layer(inputs, initial_state=initial_state[0]) - else: - output = layer(inputs, initial_state=initial_state) - assert initial_state[0] in layer._inbound_nodes[0].input_tensors - - model = keras.models.Model([inputs] + initial_state, output) - model.compile( - loss='categorical_crossentropy', - optimizer=gradient_descent.GradientDescentOptimizer(0.01)) - - inputs = np.random.random((num_samples, timesteps, embedding_dim)) - initial_state = [ - np.random.random((num_samples, units)) for _ in range(num_states) - ] - targets = np.random.random((num_samples, units)) - model.train_on_batch([inputs] + initial_state, targets) - - @test_util.run_in_graph_and_eager_modes(config=_config) - def DISABLED_test_specify_initial_state_non_keras_tensor(self): - num_states = 2 - timesteps = 3 - embedding_dim = 4 - units = 3 - num_samples = 2 - - # Test with non-Keras tensor - inputs = keras.Input((timesteps, embedding_dim)) - initial_state = [ - keras.backend.random_normal_variable((num_samples, units), 0, 1) - for _ in range(num_states) - ] - layer = keras.layers.UnifiedLSTM(units) - output = layer(inputs, initial_state=initial_state) - - model = keras.models.Model(inputs, output) + model.add( + keras.layers.Embedding( + 4, + embedding_dim, + mask_zero=True, + input_length=timesteps, + batch_input_shape=(num_samples, timesteps))) + layer = layer_class( + units, return_sequences=False, stateful=True, weights=None) + model.add(layer) model.compile( - loss='categorical_crossentropy', - optimizer=gradient_descent.GradientDescentOptimizer(0.01)) + optimizer=gradient_descent.GradientDescentOptimizer(0.01), loss='mse') + out1 = model.predict(np.ones((num_samples, timesteps))) + self.assertEqual(out1.shape, (num_samples, units)) - inputs = np.random.random((num_samples, timesteps, embedding_dim)) - targets = np.random.random((num_samples, units)) - model.train_on_batch(inputs, targets) + # train once so that the states change + model.train_on_batch( + np.ones((num_samples, timesteps)), np.ones((num_samples, units))) + out2 = model.predict(np.ones((num_samples, timesteps))) - @test_util.run_in_graph_and_eager_modes(config=_config) - def test_reset_states_with_values(self): - num_states = 2 - timesteps = 3 - embedding_dim = 4 - units = 3 - num_samples = 2 + # if the state is not reset, output should be different + self.assertNotEqual(out1.max(), out2.max()) - layer = keras.layers.UnifiedLSTM(units, stateful=True) - layer.build((num_samples, timesteps, embedding_dim)) + # check that output changes after states are reset + # (even though the model itself didn't change) layer.reset_states() - assert len(layer.states) == num_states - assert layer.states[0] is not None - self.assertAllClose( - keras.backend.eval(layer.states[0]), - np.zeros(keras.backend.int_shape(layer.states[0])), - atol=1e-4) - state_shapes = [keras.backend.int_shape(state) for state in layer.states] - values = [np.ones(shape) for shape in state_shapes] - if len(values) == 1: - values = values[0] - layer.reset_states(values) - self.assertAllClose( - keras.backend.eval(layer.states[0]), - np.ones(keras.backend.int_shape(layer.states[0])), - atol=1e-4) + out3 = model.predict(np.ones((num_samples, timesteps))) + self.assertNotEqual(out2.max(), out3.max()) - # Test with invalid data - with self.assertRaises(ValueError): - layer.reset_states([1] * (len(layer.states) + 1)) + # check that container-level reset_states() works + model.reset_states() + out4 = model.predict(np.ones((num_samples, timesteps))) + self.assertAllClose(out3, out4, atol=1e-5) - @test_util.run_in_graph_and_eager_modes(config=_config) - def test_specify_state_with_masking(self): - num_states = 2 - timesteps = 3 - embedding_dim = 4 - units = 3 - num_samples = 2 + # check that the call to `predict` updated the states + out5 = model.predict(np.ones((num_samples, timesteps))) + self.assertNotEqual(out4.max(), out5.max()) - inputs = keras.Input((timesteps, embedding_dim)) - _ = keras.layers.Masking()(inputs) - initial_state = [keras.Input((units,)) for _ in range(num_states)] - output = keras.layers.UnifiedLSTM(units)( - inputs, initial_state=initial_state) - - model = keras.models.Model([inputs] + initial_state, output) - model.compile( - loss='categorical_crossentropy', - optimizer=gradient_descent.GradientDescentOptimizer(0.01)) - - inputs = np.random.random((num_samples, timesteps, embedding_dim)) - initial_state = [ - np.random.random((num_samples, units)) for _ in range(num_states) - ] - targets = np.random.random((num_samples, units)) - model.train_on_batch([inputs] + initial_state, targets) - - @test_util.run_in_graph_and_eager_modes(config=_config) - def test_return_state(self): - num_states = 2 - timesteps = 3 - embedding_dim = 4 - units = 3 - num_samples = 2 - - inputs = keras.Input(batch_shape=(num_samples, timesteps, embedding_dim)) - layer = keras.layers.UnifiedLSTM(units, return_state=True, stateful=True) - outputs = layer(inputs) - state = outputs[1:] - assert len(state) == num_states - model = keras.models.Model(inputs, state[0]) - - inputs = np.random.random((num_samples, timesteps, embedding_dim)) - state = model.predict(inputs) - self.assertAllClose(keras.backend.eval(layer.states[0]), state, atol=1e-4) - - @test_util.run_in_graph_and_eager_modes(config=_config) - def test_state_reuse(self): - timesteps = 3 - embedding_dim = 4 - units = 3 - num_samples = 2 - - inputs = keras.Input(batch_shape=(num_samples, timesteps, embedding_dim)) - layer = keras.layers.UnifiedLSTM( - units, return_state=True, return_sequences=True) - outputs = layer(inputs) - output, state = outputs[0], outputs[1:] - output = keras.layers.UnifiedLSTM(units)(output, initial_state=state) - model = keras.models.Model(inputs, output) - - inputs = np.random.random((num_samples, timesteps, embedding_dim)) - model.predict(inputs) - - @test_util.run_in_graph_and_eager_modes(config=_config) - def test_initial_states_as_other_inputs(self): - timesteps = 3 - embedding_dim = 4 - units = 3 - num_samples = 2 - num_states = 2 - layer_class = keras.layers.UnifiedLSTM - - # Test with Keras tensor - main_inputs = keras.Input((timesteps, embedding_dim)) - initial_state = [keras.Input((units,)) for _ in range(num_states)] - inputs = [main_inputs] + initial_state - - layer = layer_class(units) - output = layer(inputs) - assert initial_state[0] in layer._inbound_nodes[0].input_tensors - - model = keras.models.Model(inputs, output) - model.compile( - loss='categorical_crossentropy', - optimizer=gradient_descent.GradientDescentOptimizer(0.01)) - - main_inputs = np.random.random((num_samples, timesteps, embedding_dim)) - initial_state = [ - np.random.random((num_samples, units)) for _ in range(num_states) - ] - targets = np.random.random((num_samples, units)) - model.train_on_batch([main_inputs] + initial_state, targets) + # Check masking + layer.reset_states() + left_padded_input = np.ones((num_samples, timesteps)) + left_padded_input[0, :1] = 0 + left_padded_input[1, :2] = 0 + out6 = model.predict(left_padded_input) -@test_util.run_v1_only('b/120545219') -class LSTMLayerGraphOnlyTest(test.TestCase): + layer.reset_states() - def test_statefulness_LSTM(self): - num_samples = 2 - timesteps = 3 - embedding_dim = 4 - units = 2 - layer_class = keras.layers.UnifiedLSTM - with self.cached_session(config=_config): - model = keras.models.Sequential() - model.add( - keras.layers.Embedding( - 4, - embedding_dim, - mask_zero=True, - input_length=timesteps, - batch_input_shape=(num_samples, timesteps))) - layer = layer_class( - units, return_sequences=False, stateful=True, weights=None) - model.add(layer) - model.compile( - optimizer=gradient_descent.GradientDescentOptimizer(0.01), loss='mse') - out1 = model.predict(np.ones((num_samples, timesteps))) - self.assertEqual(out1.shape, (num_samples, units)) - - # train once so that the states change - model.train_on_batch( - np.ones((num_samples, timesteps)), np.ones((num_samples, units))) - out2 = model.predict(np.ones((num_samples, timesteps))) - - # if the state is not reset, output should be different - self.assertNotEqual(out1.max(), out2.max()) - - # check that output changes after states are reset - # (even though the model itself didn't change) - layer.reset_states() - out3 = model.predict(np.ones((num_samples, timesteps))) - self.assertNotEqual(out2.max(), out3.max()) - - # check that container-level reset_states() works - model.reset_states() - out4 = model.predict(np.ones((num_samples, timesteps))) - self.assertAllClose(out3, out4, atol=1e-5) - - # check that the call to `predict` updated the states - out5 = model.predict(np.ones((num_samples, timesteps))) - self.assertNotEqual(out4.max(), out5.max()) - - # Check masking - layer.reset_states() - - left_padded_input = np.ones((num_samples, timesteps)) - left_padded_input[0, :1] = 0 - left_padded_input[1, :2] = 0 - out6 = model.predict(left_padded_input) - - layer.reset_states() - - right_padded_input = np.ones((num_samples, timesteps)) - right_padded_input[0, -1:] = 0 - right_padded_input[1, -2:] = 0 - out7 = model.predict(right_padded_input) - - self.assertAllClose(out7, out6, atol=1e-5) + right_padded_input = np.ones((num_samples, timesteps)) + right_padded_input[0, -1:] = 0 + right_padded_input[1, -2:] = 0 + out7 = model.predict(right_padded_input) - def test_regularizers_LSTM(self): - embedding_dim = 4 - layer_class = keras.layers.UnifiedLSTM - with self.cached_session(config=_config): - layer = layer_class( - 5, - return_sequences=False, - weights=None, - input_shape=(None, embedding_dim), - kernel_regularizer=keras.regularizers.l1(0.01), - recurrent_regularizer=keras.regularizers.l1(0.01), - bias_regularizer='l2', - activity_regularizer='l1') - layer.build((None, None, 2)) - self.assertEqual(len(layer.losses), 3) - x = keras.backend.variable(np.ones((2, 3, 2))) - layer(x) - self.assertEqual(len(layer.get_losses_for(x)), 1) + self.assertAllClose(out7, out6, atol=1e-5) class UnifiedLSTMPerformanceTest(test.Benchmark):