From 229c2e78833dc4574083de0935ad321ea7a72317 Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Wed, 6 Dec 2017 10:31:06 +0800 Subject: [PATCH] Feature/while op sentiment analysis (#6282) * Add DataFeeder A v2 API like data feeder for book demos. We can feed data directly from reader. * Fix CI * Add an unittest for while/rnn op forward * Add unittest for raw while op backward * Fix CI * Complete Dynamic RNN --- paddle/framework/backward.cc | 4 +- python/paddle/v2/fluid/layer_helper.py | 7 + python/paddle/v2/fluid/layers.py | 221 ++++++++++++++++++- python/paddle/v2/fluid/tests/test_dyn_rnn.py | 45 +++- 4 files changed, 269 insertions(+), 8 deletions(-) diff --git a/paddle/framework/backward.cc b/paddle/framework/backward.cc index c8b85caaca2..7294ba1a9c5 100644 --- a/paddle/framework/backward.cc +++ b/paddle/framework/backward.cc @@ -33,8 +33,8 @@ static std::unordered_set* g_ctrl_flow_ops_ = nullptr; // We should design a better way to backward CtrlFlowOps. static std::unordered_set& CtrlFlowOps() { if (g_ctrl_flow_ops_ == nullptr) { - g_ctrl_flow_ops_ = - new std::unordered_set{"increment", "lod_rank_table"}; + g_ctrl_flow_ops_ = new std::unordered_set{ + "increment", "lod_rank_table", "less_than"}; } return *g_ctrl_flow_ops_; } diff --git a/python/paddle/v2/fluid/layer_helper.py b/python/paddle/v2/fluid/layer_helper.py index cbee3fe6371..3963e132223 100644 --- a/python/paddle/v2/fluid/layer_helper.py +++ b/python/paddle/v2/fluid/layer_helper.py @@ -151,6 +151,13 @@ class LayerHelper(object): persistable=True, initializer=initializer) + @property + def to_kwargs(self): + return { + 'main_program': self.main_program, + 'startup_program': self.startup_program + } + def append_bias_op(self, input_var, dim_start=1, dim_end=None): """ Append bias operator and return its output. If the user does not set diff --git a/python/paddle/v2/fluid/layers.py b/python/paddle/v2/fluid/layers.py index 3f7cd525b30..98a04ea9c25 100644 --- a/python/paddle/v2/fluid/layers.py +++ b/python/paddle/v2/fluid/layers.py @@ -6,6 +6,7 @@ from paddle.v2.fluid.layer_helper import LayerHelper, unique_name import re import cStringIO from param_attr import ParamAttr +import contextlib __all__ = [ 'fc', 'data', 'cross_entropy', 'conv2d', 'pool2d', 'embedding', 'concat', @@ -1395,7 +1396,7 @@ def lod_tensor_to_array(x, table, main_program=None): return array -def array_to_lod_tensor(x, table, main_program=None): +def array_to_lod_tensor(x, table, main_program=None, startup_program=None): """ This function creates an operator to convert an array to a LOD_Tensor. @@ -1476,7 +1477,11 @@ def zeros(shape, dtype, main_program=None): return fill_constant(value=0.0, **locals()) -def increment(x, value=1.0, in_place=True, main_program=None): +def increment(x, + value=1.0, + in_place=True, + main_program=None, + startup_program=None): """ This function creates an operator to increment each value in the input `x` by an amount: `value` as mentioned in the input parameter. This @@ -1495,7 +1500,7 @@ def increment(x, value=1.0, in_place=True, main_program=None): return out -def array_write(x, i, array=None, main_program=None): +def array_write(x, i, array=None, main_program=None, startup_program=None): """ This function creates an operator to write the data out as a LOD_TENSOR_ARRAY. @@ -1534,7 +1539,7 @@ def less_than(x, y, cond=None, main_program=None, **ignored): return cond -def array_read(array, i, main_program=None): +def array_read(array, i, main_program=None, startup_program=None): """ This function creates an operator to read the data in as a LOD_TENSOR_ARRAY. @@ -1553,7 +1558,7 @@ def array_read(array, i, main_program=None): return out -def shrink_memory(x, i, table, main_program=None): +def shrink_memory(x, i, table, main_program=None, startup_program=None): """ This function creates an operator to shrink_rnn_memory using the RankTable as mentioned in the input parameter. @@ -1890,3 +1895,209 @@ class IfElse(object): main_program=self.helper.main_program, startup_program=self.helper.startup_program)) return rlist + + +class DynamicRNN(object): + BEFORE_RNN = 0 + IN_RNN = 1 + AFTER_RNN = 2 + + def __init__(self, name=None, main_program=None, startup_program=None): + self.helper = LayerHelper( + 'dynamic_rnn', + name=name, + main_program=main_program, + startup_program=startup_program) + self.status = DynamicRNN.BEFORE_RNN + self.lod_rank_table = None + self.max_seq_len = None + self.step_idx = None + self.zero_idx = fill_constant(shape=[1], value=0, dtype='int64') + self.mem_dict = dict() + self.output_array = [] + self.outputs = [] + self.cond = self.helper.create_tmp_variable(dtype='bool') + self.cond.stop_gradient = False + self.while_op = While(self.cond) + self.input_array = [] + self.mem_link = [] + + def step_input(self, x): + self._assert_in_rnn_block_("step_input") + if not isinstance(x, Variable): + raise TypeError( + "step_input() can only take a Variable as its input") + parent_block = self._parent_block_() + if self.lod_rank_table is None: + self.lod_rank_table = parent_block.create_var( + name=unique_name('lod_rank_table'), + type=core.VarDesc.VarType.LOD_RANK_TABLE) + self.lod_rank_table.stop_gradient = True + parent_block.append_op( + type='lod_rank_table', + inputs={"X": x}, + outputs={"Out": self.lod_rank_table}) + self.max_seq_len = parent_block.create_var( + name=unique_name('dynamic_rnn_max_seq_len'), dtype='int64') + self.max_seq_len.stop_gradient = False + parent_block.append_op( + type='max_sequence_len', + inputs={'RankTable': self.lod_rank_table}, + outputs={"Out": self.max_seq_len}) + self.cond.stop_gradient = True + parent_block.append_op( + type='less_than', + inputs={'X': self.step_idx, + 'Y': self.max_seq_len}, + outputs={'Out': self.cond}) + + input_array = parent_block.create_var( + name=unique_name('dynamic_rnn_input_array'), + type=core.VarDesc.VarType.LOD_TENSOR_ARRAY, + dtype=x.dtype) + self.input_array.append((input_array, x.dtype)) + parent_block.append_op( + type='lod_tensor_to_array', + inputs={'X': x, + 'RankTable': self.lod_rank_table}, + outputs={'Out': input_array}) + return array_read( + array=input_array, i=self.step_idx, **self.helper.to_kwargs) + + @contextlib.contextmanager + def block(self): + if self.status != DynamicRNN.BEFORE_RNN: + raise ValueError("rnn.block() can only be invoke once") + self.step_idx = fill_constant(shape=[1], dtype='int64', value=0) + self.step_idx.stop_gradient = False + self.status = DynamicRNN.IN_RNN + with self.while_op.block(): + yield + increment( + x=self.step_idx, + value=1.0, + in_place=True, + **self.helper.to_kwargs) + + for new_mem, mem_array in self.mem_link: + array_write( + x=new_mem, + i=self.step_idx, + array=mem_array, + **self.helper.to_kwargs) + + less_than( + x=self.step_idx, + y=self.max_seq_len, + cond=self.cond, + **self.helper.to_kwargs) + + self.status = DynamicRNN.AFTER_RNN + for each_array in self.output_array: + self.outputs.append( + array_to_lod_tensor( + x=each_array, + table=self.lod_rank_table, + **self.helper.to_kwargs)) + + def __call__(self, *args, **kwargs): + if self.status != DynamicRNN.AFTER_RNN: + raise ValueError( + "Dynamic RNN outputs can only be retrieved after rnn block") + if len(self.outputs) == 1: + return self.outputs[0] + else: + return self.outputs + + def memory(self, init=None, shape=None, value=0.0, dtype='float32'): + self._assert_in_rnn_block_('memory') + if init is not None: + if not isinstance(init, Variable): + raise TypeError( + "The input arg `init` of memory() must be a Variable") + parent_block = self._parent_block_() + mem_array = parent_block.create_var( + name=unique_name('dynamic_rnn_mem_array'), + type=core.VarDesc.VarType.LOD_TENSOR_ARRAY, + dtype=init.dtype) + parent_block.append_op( + type='write_to_array', + inputs={'X': init, + 'I': self.zero_idx}, + outputs={'Out': mem_array}) + retv = array_read( + array=mem_array, i=self.step_idx, **self.helper.to_kwargs) + retv = shrink_memory( + x=retv, + i=self.step_idx, + table=self.lod_rank_table, + **self.helper.to_kwargs) + self.mem_dict[retv.name] = mem_array + return retv + else: + if len(self.input_array) == 0: + raise ValueError( + "step_input should be invoked before memory(shape=..., value=...)" + ) + parent_block = self._parent_block_() + init = parent_block.create_var( + name=unique_name('mem_init'), dtype=dtype) + arr, dtype = self.input_array[0] + in0 = parent_block.create_var(name=unique_name('in0'), dtype=dtype) + parent_block.append_op( + type='read_from_array', + inputs={'X': [arr], + 'I': [self.zero_idx]}, + outputs={'Out': [in0]}) + parent_block.append_op( + type='fill_constant_batch_size_like', + inputs={'Input': [in0]}, + outputs={'Out': [init]}, + attrs={ + 'shape': [-1] + shape, + 'value': float(value), + 'dtype': init.dtype + }) + return self.memory(init=init) + + def update_memory(self, ex_mem, new_mem): + self._assert_in_rnn_block_('update_memory') + if not isinstance(ex_mem, Variable): + raise TypeError("The input arg `ex_mem` of update_memory() must " + "be a Variable") + if not isinstance(new_mem, Variable): + raise TypeError("The input arg `new_mem` of update_memory() must " + "be a Variable") + + mem_array = self.mem_dict.get(ex_mem.name, None) + if mem_array is None: + raise ValueError("Please invoke memory before update_memory") + if self.lod_rank_table is None: + raise ValueError("Please invoke step_input before update_memory") + + self.mem_link.append((new_mem, mem_array)) + + def output(self, *outputs): + self._assert_in_rnn_block_('output') + parent_block = self._parent_block_() + for each in outputs: + outside_array = parent_block.create_var( + name=unique_name("_".join( + [self.helper.name, "output_array", each.name])), + type=core.VarDesc.VarType.LOD_TENSOR_ARRAY, + dtype=each.dtype) + array_write(x=each, i=self.step_idx, array=outside_array) + self.output_array.append(outside_array) + + def _parent_block_(self): + prog = self.helper.main_program + parent_idx = prog.current_block().parent_idx + assert parent_idx >= 0 + parent_block = prog.block(parent_idx) + + return parent_block + + def _assert_in_rnn_block_(self, method): + if self.status != DynamicRNN.IN_RNN: + raise ValueError("{0} can only be invoked inside rnn block.".format( + method)) diff --git a/python/paddle/v2/fluid/tests/test_dyn_rnn.py b/python/paddle/v2/fluid/tests/test_dyn_rnn.py index 271e39a0e0f..034266c26f4 100644 --- a/python/paddle/v2/fluid/tests/test_dyn_rnn.py +++ b/python/paddle/v2/fluid/tests/test_dyn_rnn.py @@ -7,7 +7,7 @@ import numpy class TestDynRNN(unittest.TestCase): def setUp(self): self.word_dict = paddle.dataset.imdb.word_dict() - self.BATCH_SIZE = 100 + self.BATCH_SIZE = 2 self.train_data = paddle.batch( paddle.dataset.imdb.train(self.word_dict), batch_size=self.BATCH_SIZE) @@ -55,6 +55,7 @@ class TestDynRNN(unittest.TestCase): mem = fluid.layers.shrink_memory(x=mem, i=i, table=rank_table) hidden = fluid.layers.fc(input=[mem, ipt], size=100, act='tanh') + fluid.layers.array_write(x=hidden, i=i, array=out) fluid.layers.increment(x=i, in_place=True) fluid.layers.array_write(x=hidden, i=i, array=mem_array) @@ -82,6 +83,48 @@ class TestDynRNN(unittest.TestCase): print(val) self.assertFalse(numpy.isnan(val)) + def test_train_dyn_rnn(self): + main_program = fluid.Program() + startup_program = fluid.Program() + with fluid.program_guard(main_program, startup_program): + sentence = fluid.layers.data( + name='word', shape=[1], dtype='int64', lod_level=1) + sent_emb = fluid.layers.embedding( + input=sentence, size=[len(self.word_dict), 32], dtype='float32') + + rnn = fluid.layers.DynamicRNN() + + with rnn.block(): + in_ = rnn.step_input(sent_emb) + mem = rnn.memory(shape=[100], dtype='float32') + out_ = fluid.layers.fc(input=[in_, mem], size=100, act='tanh') + rnn.update_memory(mem, out_) + rnn.output(out_) + + last = fluid.layers.sequence_pool(input=rnn(), pool_type='last') + logits = fluid.layers.fc(input=last, size=1, act=None) + label = fluid.layers.data(name='label', shape=[1], dtype='float32') + loss = fluid.layers.sigmoid_cross_entropy_with_logits( + x=logits, label=label) + loss = fluid.layers.mean(x=loss) + sgd = fluid.optimizer.Adam(1e-3) + sgd.minimize(loss=loss) + + cpu = fluid.CPUPlace() + exe = fluid.Executor(cpu) + exe.run(startup_program) + feeder = fluid.DataFeeder(feed_list=[sentence, label], place=cpu) + data = next(self.train_data()) + loss_0 = exe.run(main_program, + feed=feeder.feed(data), + fetch_list=[loss])[0] + for _ in xrange(100): + val = exe.run(main_program, + feed=feeder.feed(data), + fetch_list=[loss])[0] + # loss should be small after 100 mini-batch + self.assertLess(val[0], loss_0[0]) + if __name__ == '__main__': unittest.main() -- GitLab