• X
    Run Python OP tests in a single Python process to improve test time. (#8362) · cde6241a
    Xin Pan 提交于
    Currently, our tests run with 2 GPUs, the init time is absurdly long:
    about 4s for each process.  Currently, we run each OP test on
    different processes. This PR:
    
    1. create cmake function py_test_modules which will generate the
    Makefile that runs a list of Python unittest module in a single Python
    process.
    
    2. move all "python unittest compatible" (e.g., used the unittest
    package, not just a regular python file). from fluid/tests to
    fluid/tests/unittests.
    
    3. cmake now will run all OP tests in fluid/tests/unittests in a
    single process, except the time-consuming tests, they are separated
    into different processes to utilize parallelism. Please make sure to
    use the unittest package if you put the python test file in
    fluid/tests/unittests
    
    4. remove all exit(0) from fluid/tests/unittests/*.py, exit(0) is used
    to disable unittest, we can not do it when running all tests in a
    single process since it will terminate the process without running the
    other tests. Instead, the test is disabled in
    fluid/tests/unittests/CMakeLists.txt. FIXME is added for each disabled
    item. Please disable the unittest from
    fluid/tests/unittests/CMakeLists.txt, instead of adding exit(0) to the
    Python file, for all Python file in fluid/tests/unittests/.
    
    5. add an option WITH_FAST_BUNDLE_TEST. When OFF, will run the unit
    tests in separate process so that they can be tested individually.
    cde6241a
test_gru_op.py 6.0 KB
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
import numpy as np
import math
from op_test import OpTest
from test_lstm_op import identity, sigmoid, tanh, relu


class TestGRUOp(OpTest):
    lod = [[0, 2, 6, 9]]
    batch_size = lod[0][-1]
    frame_size = 5
    activate = {
        'identity': identity,
        'sigmoid': sigmoid,
        'tanh': tanh,
        'relu': relu
    }

    @staticmethod
    def seq_to_batch(lod, is_reverse):
        idx_in_seq_list = []
        seq_starts = lod[0]
        seq_lens = []
        for i in range(len(seq_starts) - 1):
            seq_lens.append(seq_starts[i + 1] - seq_starts[i])
        sorted_seqs = sorted(
            range(len(seq_lens)), lambda x, y: seq_lens[y] - seq_lens[x])
        num_batch = seq_lens[sorted_seqs[0]]
        for batch_idx in range(num_batch):
            idx_in_seq = []
            for i in range(len(seq_lens)):
                if seq_lens[sorted_seqs[i]] <= batch_idx:
                    break
                idx = (seq_starts[sorted_seqs[i] + 1] - 1 - batch_idx
                       ) if is_reverse else (
                           seq_starts[sorted_seqs[i]] + batch_idx)
                idx_in_seq.append(idx)
            idx_in_seq_list.append(idx_in_seq)
        return idx_in_seq_list, sorted_seqs

    def gru_step(self, x, h_p, w, b):
        batch_size = x.shape[0]
        frame_size = w.shape[0]
        g = x + np.tile(b, (batch_size, 1))
        w_u_r = w.flatten()[:frame_size * frame_size * 2].reshape(
            (frame_size, frame_size * 2))
        u_r = self.activate[self.attrs['gate_activation']](np.dot(
            h_p, w_u_r) + g[:, :frame_size * 2])
        u = u_r[:, :frame_size]
        r = u_r[:, frame_size:frame_size * 2]
        r_h_p = r * h_p
        w_c = w.flatten()[frame_size * frame_size * 2:].reshape(
            (frame_size, frame_size))
        c = self.activate[self.attrs['activation']](np.dot(r_h_p, w_c) +
                                                    g[:, frame_size * 2:])
        g = np.hstack((u_r, c))
        h = u * c + (1 - u) * h_p
        return g, r_h_p, h

    def gru(self):
        input, lod = self.inputs['Input']
        w = self.inputs['Weight']
        b = self.inputs['Bias'] if self.inputs.has_key('Bias') else np.zeros(
            (1, self.frame_size * 3))
        batch_gate = self.outputs['BatchGate']
        batch_reset_hidden_prev = self.outputs['BatchResetHiddenPrev']
        batch_hidden = self.outputs['BatchHidden']
        hidden = self.outputs['Hidden']
        idx_in_seq_list = self.idx_in_seq_list
        h_p = self.inputs['H0'][self.sorted_seqs] if self.inputs.has_key(
            'H0') else np.zeros((len(idx_in_seq_list[0]), self.frame_size))
        num_batch = len(idx_in_seq_list)
        end_idx = 0
        for batch_idx in range(num_batch):
            x = input[idx_in_seq_list[batch_idx]]
            g, r_h_p, h = self.gru_step(x, h_p, w, b)
            if batch_idx < (num_batch - 1):
                h_p = h[:len(idx_in_seq_list[batch_idx + 1])]
            start_idx = end_idx
            end_idx = start_idx + len(idx_in_seq_list[batch_idx])
            batch_gate[start_idx:end_idx] = g
            batch_reset_hidden_prev[start_idx:end_idx] = r_h_p
            batch_hidden[start_idx:end_idx] = h
            hidden[idx_in_seq_list[batch_idx]] = h
        return batch_gate, batch_reset_hidden_prev, hidden

    def set_data(self):
        lod = self.lod
        self.idx_in_seq_list, self.sorted_seqs = self.seq_to_batch(
            lod, self.is_reverse)
        batch_size = self.batch_size
        frame_size = self.frame_size
        input = np.random.rand(batch_size, frame_size * 3).astype('float64')
        h0 = np.random.rand(len(self.idx_in_seq_list[0]),
                            frame_size).astype('float64')
        weight = np.random.rand(frame_size, frame_size * 3).astype('float64')
        bias = np.random.rand(1, frame_size * 3).astype('float64')

        self.inputs = {
            'Input': (input, lod),
            'H0': h0,
            'Weight': weight,
            'Bias': bias
        }

        self.outputs = {
            'BatchGate': np.zeros(
                (batch_size, frame_size * 3), dtype='float64'),
            'BatchResetHiddenPrev': np.zeros(
                (batch_size, frame_size), dtype='float64'),
            'BatchHidden': np.zeros(
                (batch_size, frame_size), dtype='float64'),
            'Hidden': np.zeros(
                (batch_size, frame_size), dtype='float64')
        }

    def set_confs(self):
        self.is_reverse = False
        self.attrs = {
            'activation': 'tanh',
            'gate_activation': 'sigmoid',
            'is_reverse': self.is_reverse
        }

    def setUp(self):
        self.op_type = "gru"
        self.set_confs()
        self.set_data()
        self.gru()

    def test_check_output(self):
        self.check_output()

    def test_check_grad(self):
        self.check_grad(['Input', 'H0', 'Weight', 'Bias'], ['Hidden'])


class TestGRUOpNoInitial(TestGRUOp):
    def set_data(self):
        super(TestGRUOpNoInitial, self).set_data()
        self.inputs.pop('H0')

    def test_check_grad(self):
        self.check_grad(['Input', 'Weight', 'Bias'], ['Hidden'])


class TestGRUOpReverse(TestGRUOp):
    def set_confs(self):
        self.is_reverse = True
        self.attrs = {
            'activation': 'tanh',
            'gate_activation': 'sigmoid',
            'is_reverse': self.is_reverse
        }


if __name__ == "__main__":
    unittest.main()
反馈
建议
客服 返回
顶部