test_recurrent_op.py 4.1 KB
Newer Older
Y
Yan Chunwei 已提交
1
import logging
Y
Yan Chunwei 已提交
2 3 4
import paddle.v2.framework.core as core
import unittest
import numpy as np
S
superjom 已提交
5
from paddle.v2.framework.op import Operator
Y
Yan Chunwei 已提交
6

S
superjom 已提交
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62

def py_sigmoid(x):
    return 1. / (1 + np.exp(-x))


class PySimpleRNN(object):
    '''
    A simple implementation of RNN based on numpy, to futhur test RecurrentOp's alogorithm
    '''
    def __init__(self,
                 input_dim = 30,
                 batch_size = 50,
                 weight_dim = 15,
                 sent_len = 11):
        self.x = np.random.normal(size=(sent_len, batch_size, input_dim))
        self.W = np.random.normal(size=(input_dim, input_dim))
        self.U = np.random.normal(size=(input_dim, input_dim))
        self.h_boot = np.random.normal(size=(batch_size, input_dim))

        # memories
        self.mems = [np.zeros(shape=(batch_size, input_dim)) for i in range(sent_len)]

    def forward(self):
        xs = self.segment_inputs()
        for step_id in range(self.x.shape[0]):
            self.step(step_id, xs[step_id])
        return self.concat_outputs()

    def segment_inputs(self):
        return [self.x[i] for i in range(self.x.shape[0])]

    def concat_outputs(self):
        return np.array(self.mems)

    def step(self, step_id, x):
        '''
        run a step
        '''
        mem = self.mems[step_id]
        if step_id > 0:
            pre_mem = self.mems[step_id-1]
        else:
            pre_mem = self.h_boot
        xW = np.matmul(x, self.W)
        hU = np.matmul(mem, self.U)

        sum = xW + hU
        self.mems[step_id] = py_sigmoid(sum)

class PySimpleRNNTest(unittest.TestCase):
    def setUp(self):
        self.rnn = PySimpleRNN()

    def test_forward(self):
        output = self.rnn.forward()
        print 'output', output
Y
Yan Chunwei 已提交
63 64 65


def create_tensor(scope, name, shape):
Y
Yan Chunwei 已提交
66
    tensor = scope.new_var(name).get_tensor()
Y
Yan Chunwei 已提交
67
    tensor.set_dims(shape)
Y
Yan Chunwei 已提交
68
    tensor.set(np.random.random(shape), core.CPUPlace())
Y
Yan Chunwei 已提交
69 70 71
    return tensor


S
superjom 已提交
72
class TestRecurrentOp(unittest.TestCase):
Y
Yan Chunwei 已提交
73 74 75 76 77 78 79 80 81 82 83 84 85
    '''
    Test RNNOp

    equation:
        h_t = \sigma (W x_t + U h_{t-1})
    weights:
        - W
        - U
    vars:
        - x
    memories:
        - h
    outputs:
S
superjom 已提交
86
       - h
Y
Yan Chunwei 已提交
87 88
    '''

Y
Yan Chunwei 已提交
89 90 91 92 93
    input_dim = 30
    batch_size = 50
    weight_dim = 15
    sent_len = 11

S
superjom 已提交
94
    def forward(self):
Y
Yan Chunwei 已提交
95

Y
Yan Chunwei 已提交
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
        self.scope = core.Scope()

        self.create_global_variables()
        self.create_step_net()
        rnn_op = self.create_rnn_op()
        ctx = core.DeviceContext.create(core.CPUPlace())
        print 'infer_shape'
        rnn_op.infer_shape(self.scope)
        rnn_op.run(self.scope, ctx)

    def create_global_variables(self):
        # create inlink
        create_tensor(self.scope, "x",
                      [self.sent_len, self.batch_size, self.input_dim])
        create_tensor(self.scope, "W", [self.input_dim, self.input_dim])
        create_tensor(self.scope, "U", [self.input_dim, self.input_dim])
        create_tensor(self.scope, "h_boot", [self.batch_size, self.input_dim])
        self.scope.new_var("step_scopes")
        self.scope.new_var("h@alias")
        self.scope.new_var("h")

    def create_rnn_op(self):
Y
Yan Chunwei 已提交
118
        # create RNNOp
S
superjom 已提交
119
        rnnop = Operator("recurrent_op",
Y
Yan Chunwei 已提交
120 121 122 123 124
            # inputs
            inlinks=["x"],
            boot_memories=["h_boot"],
            step_net="stepnet",
            # outputs
Y
Yan Chunwei 已提交
125
            outlinks=["h"],
Y
Yan Chunwei 已提交
126 127 128
            step_scopes="step_scopes",
            # attributes
            inlink_alias=["x@alias"],
Y
Yan Chunwei 已提交
129 130 131 132 133 134 135 136
            outlink_alias=["h@alias"],
            pre_memories=["h@pre"],
            memories=["h@alias"])
        return rnnop

    def create_step_net(self):
        var = self.scope.new_var("stepnet")
        stepnet = var.get_net()
Y
Yan Chunwei 已提交
137

S
superjom 已提交
138 139 140 141
        x_fc_op = Operator("fc", X="x@alias", W="W", Y="Wx")
        h_fc_op = Operator("fc", X="h@pre", W="U", Y="Uh")
        sum_op = Operator("add_two", X="Wx", Y="Uh", Out="sum")
        sig_op = Operator("sigmoid", X="sum", Y="h@alias")
Y
Yan Chunwei 已提交
142 143 144 145

        for op in [x_fc_op, h_fc_op, sum_op, sig_op]:
            stepnet.add_op(op)
        stepnet.complete_add_op(True)
Y
Yan Chunwei 已提交
146

S
superjom 已提交
147 148 149
    def test_forward(self):
        print 'test recurrent op forward'
        self.forward()
Y
Yan Chunwei 已提交
150 151 152 153


if __name__ == '__main__':
    unittest.main()