提交 493e1c04 编写于 作者: Q qingqing01 提交者: GitHub

Merge pull request #3335 from Superjom/rnn_forward_result_test

Rnn forward result test
......@@ -24,3 +24,4 @@ py_test(test_default_scope_funcs SRCS test_default_scope_funcs.py)
py_test(test_operator SRCS test_operator.py)
# py_test(test_gaussian_random_op SRCS test_gaussian_random_op.py)
py_test(test_uniform_random_op SRCS test_uniform_random_op.py)
py_test(test_recurrent_op SRCS test_recurrent_op.py)
......@@ -2,19 +2,74 @@ import logging
import paddle.v2.framework.core as core
import unittest
import numpy as np
import paddle.v2.framework.create_op_creation_methods as creation
from paddle.v2.framework.op import Operator
ops = creation.op_creations
def py_sigmoid(x):
return 1. / (1. + np.exp(-x))
def create_tensor(scope, name, shape):
class PySimpleRNN(object):
'''
A simple implementation of RNN based on numpy, to futhur test RecurrentOp's alogorithm
'''
def __init__(self, input_dim=30, batch_size=50, weight_dim=15, sent_len=11):
self.x = np.random.normal(size=(sent_len, batch_size, input_dim))
self.W = np.random.normal(size=(input_dim, input_dim))
self.U = np.random.normal(size=(input_dim, input_dim))
self.h_boot = np.random.normal(size=(batch_size, input_dim))
# memories
self.mems = [
np.zeros(shape=(batch_size, input_dim)) for i in range(sent_len)
]
def forward(self):
xs = self.segment_inputs()
for step_id in range(self.x.shape[0]):
self.step(step_id, xs[step_id])
return self.concat_outputs()
def segment_inputs(self):
return [self.x[i] for i in range(self.x.shape[0])]
def concat_outputs(self):
return np.array(self.mems)
def step(self, step_id, x):
'''
run a step
'''
mem = self.mems[step_id]
if step_id > 0:
pre_mem = self.mems[step_id - 1]
else:
pre_mem = self.h_boot
xW = np.matmul(x, self.W)
hU = np.matmul(mem, self.U)
sum = xW + hU
self.mems[step_id] = py_sigmoid(sum)
class PySimpleRNNTest(unittest.TestCase):
def setUp(self):
self.rnn = PySimpleRNN()
def test_forward(self):
output = self.rnn.forward()
print 'output', output
def create_tensor(scope, name, shape, np_data):
tensor = scope.new_var(name).get_tensor()
tensor.set_dims(shape)
tensor.set(np.random.random(shape), core.CPUPlace())
tensor.set(np_data, core.CPUPlace())
return tensor
class TestRNN(unittest.TestCase):
class TestRecurrentOp(unittest.TestCase):
'''
Test RNNOp
......@@ -36,33 +91,45 @@ class TestRNN(unittest.TestCase):
weight_dim = 15
sent_len = 11
def init(self):
def setUp(self):
self.py_rnn = PySimpleRNN(self.input_dim, self.batch_size,
self.weight_dim, self.sent_len)
def forward(self):
self.scope = core.Scope()
self.create_global_variables()
self.create_step_net()
rnn_op = self.create_rnn_op()
ctx = core.DeviceContext.create(core.CPUPlace())
print 'infer_shape'
rnn_op.infer_shape(self.scope)
rnn_op.run(self.scope, ctx)
return np.array(self.scope.find_var("h").get_tensor())
def create_global_variables(self):
# create inlink
x_np_data = self.py_rnn.x
create_tensor(self.scope, "x",
[self.sent_len, self.batch_size, self.input_dim])
create_tensor(self.scope, "W", [self.input_dim, self.input_dim])
create_tensor(self.scope, "U", [self.input_dim, self.input_dim])
create_tensor(self.scope, "h_boot", [self.batch_size, self.input_dim])
[self.sent_len, self.batch_size, self.input_dim],
x_np_data)
W_np_data = self.py_rnn.W
create_tensor(self.scope, "W", [self.input_dim, self.input_dim],
W_np_data)
U_np_data = self.py_rnn.U
create_tensor(self.scope, "U", [self.input_dim, self.input_dim],
U_np_data)
h_boot_np_data = self.py_rnn.h_boot
create_tensor(self.scope, "h_boot", [self.batch_size, self.input_dim],
h_boot_np_data)
self.scope.new_var("step_scopes")
self.scope.new_var("h@alias")
self.scope.new_var("h")
def create_rnn_op(self):
# create RNNOp
rnnop = ops.recurrent_op(
rnnop = Operator(
"recurrent_op",
# inputs
inlinks=["x"],
boot_memories=["h_boot"],
......@@ -81,17 +148,25 @@ class TestRNN(unittest.TestCase):
var = self.scope.new_var("stepnet")
stepnet = var.get_net()
x_fc_op = ops.fc(X="x@alias", W="W", Y="Wx")
h_fc_op = ops.fc(X="h@pre", W="U", Y="Uh")
sum_op = ops.add_two(X="Wx", Y="Uh", Out="sum")
sig_op = ops.sigmoid(X="sum", Y="h@alias")
# x_fc_op = Operator("fc", X="x@alias", W="W", Y="Wx")
# h_fc_op = Operator("fc", X="h@pre", W="U", Y="Uh")
x_fc_op = Operator("mul", X="x@alias", Y="W", Out="Wx")
h_fc_op = Operator("mul", X="h@pre", Y="U", Out="Uh")
sum_op = Operator("add_two", X="Wx", Y="Uh", Out="sum")
sig_op = Operator("sigmoid", X="sum", Y="h@alias")
for op in [x_fc_op, h_fc_op, sum_op, sig_op]:
stepnet.add_op(op)
stepnet.complete_add_op(True)
def test_recurrent(self):
self.init()
def test_forward(self):
print 'test recurrent op forward'
pd_output = self.forward()
py_output = self.py_rnn.forward()
print 'pd_output', pd_output
print
print 'py_output', py_output
self.assertEqual(pd_output.shape, py_output.shape)
if __name__ == '__main__':
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册