test_recurrent_op.py 6.0 KB
Newer Older
Y
Yan Chunwei 已提交
1
import logging
Y
Yan Chunwei 已提交
2 3 4
import paddle.v2.framework.core as core
import unittest
import numpy as np
Y
Yan Chunwei 已提交
5
from paddle.v2.framework.op import Operator, RecurrentOp
S
superjom 已提交
6
from op_test import get_numeric_gradient
Y
Yan Chunwei 已提交
7

S
superjom 已提交
8 9

def py_sigmoid(x):
S
superjom 已提交
10
    return 1. / (1. + np.exp(-x))
S
superjom 已提交
11

S
fix res  
superjom 已提交
12

S
superjom 已提交
13 14 15 16
class PySimpleRNN(object):
    '''
    A simple implementation of RNN based on numpy, to futhur test RecurrentOp's alogorithm
    '''
S
fix res  
superjom 已提交
17 18

    def __init__(self, input_dim=30, batch_size=50, weight_dim=15, sent_len=11):
Q
qiaolongfei 已提交
19 20 21 22 23 24
        self.x = np.random.normal(size=(sent_len, batch_size,
                                        input_dim)).astype("float32")
        self.W = np.random.normal(size=(input_dim, input_dim)).astype("float32")
        self.U = np.random.normal(size=(input_dim, input_dim)).astype("float32")
        self.h_boot = np.random.normal(size=(batch_size,
                                             input_dim)).astype("float32")
S
superjom 已提交
25 26

        # memories
S
fix res  
superjom 已提交
27
        self.mems = [
Q
qiaolongfei 已提交
28 29
            np.zeros(shape=(batch_size, input_dim)).astype("float32")
            for i in range(sent_len)
S
fix res  
superjom 已提交
30
        ]
S
superjom 已提交
31 32 33 34 35 36 37 38 39 40 41

    def forward(self):
        xs = self.segment_inputs()
        for step_id in range(self.x.shape[0]):
            self.step(step_id, xs[step_id])
        return self.concat_outputs()

    def segment_inputs(self):
        return [self.x[i] for i in range(self.x.shape[0])]

    def concat_outputs(self):
Q
qiaolongfei 已提交
42
        return np.array(self.mems).astype("float32")
S
superjom 已提交
43 44 45 46 47 48 49

    def step(self, step_id, x):
        '''
        run a step
        '''
        mem = self.mems[step_id]
        if step_id > 0:
S
fix res  
superjom 已提交
50
            pre_mem = self.mems[step_id - 1]
S
superjom 已提交
51 52
        else:
            pre_mem = self.h_boot
Q
qiaolongfei 已提交
53 54
        xW = np.matmul(x, self.W).astype("float32")
        hU = np.matmul(pre_mem, self.U).astype("float32")
S
superjom 已提交
55 56 57 58

        sum = xW + hU
        self.mems[step_id] = py_sigmoid(sum)

S
fix res  
superjom 已提交
59

S
superjom 已提交
60 61 62 63 64 65
class PySimpleRNNTest(unittest.TestCase):
    def setUp(self):
        self.rnn = PySimpleRNN()

    def test_forward(self):
        output = self.rnn.forward()
Y
Yan Chunwei 已提交
66 67


S
superjom 已提交
68
def create_tensor(scope, name, shape, np_data):
D
dongzhihong 已提交
69
    tensor = scope.var(name).get_tensor()
Y
Yan Chunwei 已提交
70
    tensor.set_dims(shape)
S
superjom 已提交
71
    tensor.set(np_data, core.CPUPlace())
Y
Yan Chunwei 已提交
72 73 74
    return tensor


S
init  
superjom 已提交
75
class RecurrentOpTest(unittest.TestCase):
Y
Yan Chunwei 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88
    '''
    Test RNNOp

    equation:
        h_t = \sigma (W x_t + U h_{t-1})
    weights:
        - W
        - U
    vars:
        - x
    memories:
        - h
    outputs:
S
superjom 已提交
89
       - h
Y
Yan Chunwei 已提交
90 91
    '''

Y
Yan Chunwei 已提交
92 93 94 95 96
    input_dim = 30
    batch_size = 50
    weight_dim = 15
    sent_len = 11

S
superjom 已提交
97
    def setUp(self):
S
fix res  
superjom 已提交
98 99
        self.py_rnn = PySimpleRNN(self.input_dim, self.batch_size,
                                  self.weight_dim, self.sent_len)
Y
Yan Chunwei 已提交
100

S
superjom 已提交
101 102
    def forward(self):
        self.scope = core.Scope()
Y
Yan Chunwei 已提交
103
        self.create_global_variables()
Y
Yan Chunwei 已提交
104
        self.create_rnn_op()
Y
Yan Chunwei 已提交
105 106
        self.create_step_net()
        ctx = core.DeviceContext.create(core.CPUPlace())
Y
Yan Chunwei 已提交
107
        self.rnnop.run(self.scope, ctx)
Q
qiaolongfei 已提交
108 109
        return np.array(self.scope.find_var("h@mem").get_tensor()).astype(
            "float32")
Y
Yan Chunwei 已提交
110 111 112

    def create_global_variables(self):
        # create inlink
S
superjom 已提交
113
        x_np_data = self.py_rnn.x
Y
Yan Chunwei 已提交
114
        create_tensor(self.scope, "x",
S
fix res  
superjom 已提交
115 116
                      [self.sent_len, self.batch_size, self.input_dim],
                      x_np_data)
S
superjom 已提交
117
        W_np_data = self.py_rnn.W
S
fix res  
superjom 已提交
118 119
        create_tensor(self.scope, "W", [self.input_dim, self.input_dim],
                      W_np_data)
S
superjom 已提交
120 121

        U_np_data = self.py_rnn.U
S
fix res  
superjom 已提交
122 123
        create_tensor(self.scope, "U", [self.input_dim, self.input_dim],
                      U_np_data)
S
superjom 已提交
124 125

        h_boot_np_data = self.py_rnn.h_boot
S
fix res  
superjom 已提交
126 127
        create_tensor(self.scope, "h_boot", [self.batch_size, self.input_dim],
                      h_boot_np_data)
D
dongzhihong 已提交
128 129
        self.scope.var("step_scopes")
        self.scope.var("h@mem")
Y
Yan Chunwei 已提交
130 131

    def create_rnn_op(self):
Y
Yan Chunwei 已提交
132
        # create RNNOp
Y
Yan Chunwei 已提交
133
        self.rnnop = RecurrentOp(
Y
Yan Chunwei 已提交
134
            # inputs
135 136
            inputs=["x"],
            initial_states=["h_boot"],
Y
Yan Chunwei 已提交
137 138
            step_net="stepnet",
            # outputs
139
            outputs=["h@mem"],
Y
Yan Chunwei 已提交
140 141
            step_scopes="step_scopes",
            # attributes
142 143
            ex_states=["h@pre"],
            states=["h@mem"])
Y
Yan Chunwei 已提交
144 145

    def create_step_net(self):
Y
Yan Chunwei 已提交
146
        stepnet = core.Net.create()
S
superjom 已提交
147
        x_fc_op = Operator("mul", X="x", Y="W", Out="Wx")
S
fix res  
superjom 已提交
148
        h_fc_op = Operator("mul", X="h@pre", Y="U", Out="Uh")
Q
qiaolongfei 已提交
149
        sum_op = Operator("sum", X=["Wx", "Uh"], Out="sum")
S
superjom 已提交
150
        sig_op = Operator("sigmoid", X="sum", Y="h@mem")
Y
Yan Chunwei 已提交
151 152

        for op in [x_fc_op, h_fc_op, sum_op, sig_op]:
Y
Yu Yang 已提交
153
            stepnet.append_op(op)
Y
Yan Chunwei 已提交
154
        stepnet.complete_add_op(True)
Y
Yan Chunwei 已提交
155
        self.rnnop.set_stepnet(stepnet)
Y
Yan Chunwei 已提交
156

S
superjom 已提交
157 158
    def test_forward(self):
        print 'test recurrent op forward'
S
superjom 已提交
159 160 161 162 163 164
        pd_output = self.forward()
        py_output = self.py_rnn.forward()
        print 'pd_output', pd_output
        print
        print 'py_output', py_output
        self.assertEqual(pd_output.shape, py_output.shape)
S
superjom 已提交
165
        self.assertTrue(np.isclose(pd_output, py_output, rtol=0.1).all())
Y
Yan Chunwei 已提交
166

S
fix res  
superjom 已提交
167

S
init  
superjom 已提交
168 169 170 171
class RecurrentGradientOpTest(unittest.TestCase):
    def create_forward_op(self):
        self.forward_op = RecurrentOp(
            # inputs
172 173
            inputs=["x"],
            initial_states=["h_boot"],
S
init  
superjom 已提交
174 175
            step_net="stepnet",
            # outputs
176
            outputs=["h"],
S
init  
superjom 已提交
177 178
            step_scopes="step_scopes",
            # attributes
179 180
            ex_states=["h@pre"],
            states=["h@alias"])
S
init  
superjom 已提交
181 182 183 184 185

        # create a stepnet for RNN
        stepnet = core.Net.create()
        x_fc_op = Operator("mul", X="x@alias", Y="W", Out="Wx")
        h_fc_op = Operator("mul", X="h@pre", Y="U", Out="Uh")
Q
qiaolongfei 已提交
186
        sum_op = Operator("sum", X=["Wx", "Uh"], Out="sum")
S
init  
superjom 已提交
187 188 189
        sig_op = Operator("sigmoid", X="sum", Y="h@alias")

        for op in [x_fc_op, h_fc_op, sum_op, sig_op]:
S
superjom 已提交
190
            stepnet.append_op(op)
S
init  
superjom 已提交
191 192 193 194 195 196 197 198 199 200 201 202
        stepnet.complete_add_op(True)
        self.forward_op.set_stepnet(stepnet)

    def create_gradient_op(self):
        a = set()
        backward_op = core.RecurrentOp.backward(self.forward_op, a)

    def test_grad(self):
        self.create_forward_op()
        self.create_gradient_op()


Y
Yan Chunwei 已提交
203 204
if __name__ == '__main__':
    unittest.main()