test_rnn_op_xpu.py 6.8 KB
Newer Older
1 2
#   Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
# # Licensed under the Apache License, Version 2.0 (the "License");
3
# you may not use this file except in compliance with the License.
4
# You may obtain a copy of the License at #
5 6 7 8 9 10 11 12 13
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import random
R
RedContritio 已提交
14
import sys
15
import unittest
16

R
RedContritio 已提交
17 18
sys.path.append('../../python/paddle/fluid/tests/unittests')

19
import numpy as np
20
from op_test_xpu import XPUOpTest
21

22
import paddle
23
from paddle.fluid import core
24

25
sys.path.append('../rnn')
26
from convert import get_params_for_net
R
RedContritio 已提交
27
from get_test_cover_info import (
28
    XPUOpTestWrapper,
29 30 31
    create_test_class,
    get_xpu_op_support_types,
)
R
RedContritio 已提交
32
from rnn_numpy import LSTM
33 34 35 36 37 38

random.seed(2)
np.set_printoptions(threshold=np.inf)
paddle.enable_static()


39 40 41 42 43 44 45 46 47 48 49
class XPUTestRNNOp(XPUOpTestWrapper):
    def __init__(self):
        self.op_name = 'rnn'
        self.use_dynamic_create_class = False

    class TestRNNOp(XPUOpTest):
        def setUp(self):
            self.init_size()
            self.init_dtype()
            self.op_type = "rnn"
            self.place = paddle.XPUPlace(0)
50 51 52
            self.sequence_length = np.array([12, 11, 10, 9, 8], dtype=np.int32)
            self.num_layers = 1
            self.is_bidirec = False
53 54 55 56 57 58 59 60
            self.set_attrs()
            self.mode = "LSTM"
            self.is_test = False
            self.dropout = 0.0

            self.direction_num = 2 if self.is_bidirec else 1
            direction = "bidirectional" if self.is_bidirec else "forward"

61 62 63 64 65
            input = np.random.uniform(
                low=-0.1,
                high=0.1,
                size=(self.seq_length, self.batch_size, self.input_size),
            ).astype(self.dtype)
66 67 68 69
            input[11][1:][:] = 0
            input[10][2:][:] = 0
            input[9][3:][:] = 0
            input[8][4:][:] = 0
70

71 72 73 74 75 76 77 78 79
            rnn1 = LSTM(
                self.input_size,
                self.hidden_size,
                num_layers=self.num_layers,
                time_major=True,
                direction=direction,
                dropout=self.dropout,
                dtype=self.dtype,
            )
80 81

            flat_w = get_params_for_net(rnn1)
82 83 84
            output, (last_hidden, last_cell) = rnn1(
                input, sequence_length=self.sequence_length
            )
85 86

            init_h = np.zeros(
87 88 89 90 91 92
                (
                    self.num_layers * self.direction_num,
                    self.batch_size,
                    self.hidden_size,
                )
            ).astype(self.dtype)
93
            init_c = np.zeros(
94 95 96 97 98 99
                (
                    self.num_layers * self.direction_num,
                    self.batch_size,
                    self.hidden_size,
                )
            ).astype(self.dtype)
100
            state_out = np.ndarray(300).astype("uint8")
101

102 103 104 105
            self.inputs = {
                'Input': input,
                'WeightList': flat_w,
                'PreState': [('init_h', init_h), ('init_c', init_c)],
106
                'SequenceLength': self.sequence_length,
107 108 109 110 111 112 113 114 115 116 117 118 119 120
            }
            if self.sequence_length is None:
                self.inputs = {
                    'Input': input,
                    'WeightList': flat_w,
                    'PreState': [('init_h', init_h), ('init_c', init_c)],
                }
            self.attrs = {
                'dropout_prob': self.dropout,
                'is_bidirec': self.is_bidirec,
                'input_size': self.input_size,
                'hidden_size': self.hidden_size,
                'num_layers': self.num_layers,
                'mode': self.mode,
121
                'is_test': self.is_test,
122 123 124
            }
            self.outputs = {
                'Out': output,
125 126 127 128
                "State": [
                    ('last_hidden', last_hidden),
                    ('last_cell', last_cell),
                ],
129
                'Reserve': np.ndarray(400).astype("uint8"),
130
                'DropoutState': state_out,
131 132
            }

133 134 135 136 137
        def init_dtype(self):
            self.dtype = self.in_type

        def set_xpu(self):
            self.__class__.use_xpu = True
138
            self.__class__.no_need_check_grad = False
139 140 141 142
            self.__class__.op_type = self.in_type

        def test_check_output(self):
            self.check_output_with_place(
143 144
                self.place, atol=0.01, no_check_set=['Reserve', 'DropoutState']
            )
145

146 147 148 149 150
        def test_grad(self):
            if not self.is_test:
                var_name_list = self.get_weight_names()
                grad_check_list = ['Input', 'init_h', 'init_c']
                grad_check_list.extend(var_name_list)
151 152 153 154 155
                self.check_grad_with_place(
                    self.place,
                    set(grad_check_list),
                    ['Out', 'last_hidden', 'last_cell'],
                )
156

157
        def init_size(self):
158 159 160 161
            self.seq_length = 12
            self.batch_size = 5
            self.input_size = 3
            self.hidden_size = 2
162 163 164 165 166

        def get_weight_names(self):
            weight_names = []
            for i in range(self.num_layers):
                for j in range(0, 2 * self.direction_num):
167
                    weight_names.append(f"{i}.weight_{j}")
168 169
            for i in range(self.num_layers):
                for j in range(0, 2 * self.direction_num):
170
                    weight_names.append(f"{i}.bias_{j}")
171 172 173
            return weight_names

        def set_attrs(self):
174
            pass
175 176 177

    class TestRNNOp1(TestRNNOp):
        def set_attrs(self):
178
            self.sequence_length = None
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199

    class TestRNNOp2(TestRNNOp):
        def set_attrs(self):
            self.num_layers = 1
            self.is_bidirec = True

    class TestRNNOp3(TestRNNOp):
        def set_attrs(self):
            self.num_layers = 2
            self.is_bidirec = False

    class TestRNNOp4(TestRNNOp):
        def set_attrs(self):
            self.num_layers = 3
            self.is_bidirec = False

    class TestRNNOp5(TestRNNOp):
        def set_attrs(self):
            self.num_layers = 2
            self.is_bidirec = True

200 201 202 203 204 205 206 207 208 209 210
    class TestRNNOp6(TestRNNOp):
        def set_attrs(self):
            self.num_layers = 2
            self.is_bidirec = True
            self.sequence_length = None

    class TestRNNOp7(TestRNNOp):
        def set_attrs(self):
            self.num_layers = 3
            self.is_bidirec = True

211 212 213

support_types = get_xpu_op_support_types('rnn')
for stype in support_types:
214 215 216 217 218 219
    create_test_class(
        globals(),
        XPUTestRNNOp,
        stype,
        ignore_device_version=[core.XPUVersion.XPU1],
    )
220 221 222

if __name__ == '__main__':
    unittest.main()