# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import unittest import numpy as np from op_test import OpTest from paddle import fluid def row_conv_forward(x, lod, wt): out = np.zeros_like(x) num_sequences = len(lod[0]) seq_info = [0] for seq_len in lod[0]: seq_info.append(seq_info[-1] + seq_len) context_length = wt.shape[0] for i in range(num_sequences): # loop over number of sequences start = seq_info[i] end = seq_info[i + 1] curinput = x[start:end, :] curoutput = out[start:end, :] cur_timesteps = end - start for j in range(cur_timesteps): # loop over different timesteps for k in range(context_length): if j + k >= cur_timesteps: continue curoutput[j, :] += curinput[j + k, :] * wt[k, :] return out class TestRowConvOp1(OpTest): def setUp(self): self.op_type = "row_conv" lod = [[2, 3, 2]] T = sum(lod[0]) D = 16 context_length = 8 x = np.random.random((T, D)).astype("float32") wt = np.random.random((context_length, D)).astype("float32") self.inputs = {'X': (x, lod), 'Filter': wt} out = row_conv_forward(x, lod, wt) self.outputs = {'Out': (out, lod)} def test_check_output(self): self.check_output(check_dygraph=False) def test_check_grad_normal(self): self.check_grad(['X', 'Filter'], 'Out', check_dygraph=False) def test_check_grad_ignore_x(self): self.check_grad(['Filter'], 'Out', no_grad_set=set('X'), check_dygraph=False) def test_check_grad_ignore_wt(self): self.check_grad(['X'], 'Out', no_grad_set=set('Filter'), check_dygraph=False) class TestRowConvOp2(OpTest): def setUp(self): self.op_type = "row_conv" lod = [[20, 30, 50]] T = sum(lod[0]) D = 35 context_length = 35 x = np.random.random((T, D)).astype("float32") wt = np.random.random((context_length, D)).astype("float32") self.inputs = {'X': (x, lod), 'Filter': wt} out = row_conv_forward(x, lod, wt) self.outputs = {'Out': (out, lod)} def test_check_output(self): self.check_output(check_dygraph=False) #max_relative_error is increased from 0.05 to 0.06 as for higher #dimensional input, the dX on CPU for some values has max_rel_error #slightly more than 0.05 def test_check_grad_normal(self): self.check_grad(['X', 'Filter'], 'Out', max_relative_error=0.06, check_dygraph=False) def test_check_grad_ignore_x(self): self.check_grad(['Filter'], 'Out', max_relative_error=0.06, no_grad_set=set('X'), check_dygraph=False) def test_check_grad_ignore_wt(self): self.check_grad(['X'], 'Out', max_relative_error=0.06, no_grad_set=set('Filter'), check_dygraph=False) def row_conv_foward_Tensor(x, wt): out = np.zeros_like(x) num_sequence = x.shape[0] timesteps = x.shape[1] context_length = wt.shape[0] for i in range(num_sequence): cur_in = x[i:i + 1, :][0] cur_out = out[i:i + 1, :][0] for j in range(timesteps): for k in range(context_length): if j + k >= timesteps: continue cur_out[j, :] += cur_in[j + k, :] * wt[k, :] return out class TestRowOpWithTensorInput(OpTest): def setUp(self): self.op_type = "row_conv" length = [1, 2, 3] B = 2 T = sum(length) D = 20 context_length = 6 x = np.random.random((B, T, D)).astype("float32") wt = np.random.random((context_length, D)).astype("float32") self.inputs = {'X': x, 'Filter': wt} out = row_conv_foward_Tensor(x, wt) self.outputs = {'Out': out} def test_check_output(self): self.check_output(check_dygraph=False) def test_check_grad_ignore_x(self): self.check_grad(['Filter'], 'Out', no_grad_set=set('X'), check_dygraph=False) def test_check_grad_normal(self): self.check_grad(['X', 'Filter'], 'Out', check_dygraph=False) def test_check_grad_ignore_wt(self): self.check_grad(['X'], 'Out', no_grad_set=set('Filter'), check_dygraph=False) class TestRowConvLayer(unittest.TestCase): def setUp(self): self.B = 2 self.T = 6 self.C = 20 self.context_length = 6 self.x = np.random.random((self.B, self.T, self.C)).astype("float32") self.w = np.random.random( (self.context_length, self.C)).astype("float32") self.out = row_conv_foward_Tensor(self.x, self.w) def check_identity(self): start = fluid.Program() main = fluid.Program() with fluid.unique_name.guard(): with fluid.program_guard(main, start): x = fluid.data("x", (-1, -1, self.C), "float32") out = fluid.layers.row_conv( x, self.context_length, param_attr=fluid.initializer.NumpyArrayInitializer(self.w)) place = fluid.CPUPlace() exe = fluid.Executor(place) exe.run(start) out_np, = exe.run(main, feed={'x': self.x}, fetch_list=[out]) np.testing.assert_allclose(out_np, self.out) if __name__ == '__main__': unittest.main()