未验证 提交 a1319074 编写于 作者: Z zqw_1997 提交者: GitHub

remove paddle.fluid.contrib.layers.BasicLSTMUnit、basic_lstm、BasicGRUUnit、basic_gru (#49268)

* rm paddle.fluid.contrib.layers.BasicLSTMUnit basic_lstm BasicGRUUnit basic_gru

* rm dependency in __init__.py
上级 cb34ee0f
......@@ -15,11 +15,9 @@
from . import nn
from .nn import *
from .rnn_impl import *
from . import metric_op
from .metric_op import *
__all__ = []
__all__ += nn.__all__
__all__ += rnn_impl.__all__
__all__ += metric_op.__all__
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import paddle
from paddle.fluid import layers, unique_name
from paddle.fluid.dygraph import Layer
from paddle.fluid.dygraph.layer_object_helper import LayerObjectHelper
from paddle.fluid.layers.control_flow import StaticRNN
import paddle
__all__ = ['BasicGRUUnit', 'basic_gru', 'BasicLSTMUnit', 'basic_lstm']
class BasicGRUUnit(Layer):
"""
****
BasicGRUUnit class, using basic operators to build GRU
The algorithm can be described as the equations below.
.. math::
u_t & = actGate(W_ux xu_{t} + W_uh h_{t-1} + b_u)
r_t & = actGate(W_rx xr_{t} + W_rh h_{t-1} + b_r)
m_t & = actNode(W_cx xm_t + W_ch dot(r_t, h_{t-1}) + b_m)
h_t & = dot(u_t, h_{t-1}) + dot((1-u_t), m_t)
Args:
name_scope(string) : The name scope used to identify parameters and biases
hidden_size (integer): The hidden size used in the Unit.
param_attr(ParamAttr|None): The parameter attribute for the learnable
weight matrix. Note:
If it is set to None or one attribute of ParamAttr, gru_unit will
create ParamAttr as param_attr. If the Initializer of the param_attr
is not set, the parameter is initialized with Xavier. Default: None.
bias_attr (ParamAttr|None): The parameter attribute for the bias
of GRU unit.
If it is set to None or one attribute of ParamAttr, gru_unit will
create ParamAttr as bias_attr. If the Initializer of the bias_attr
is not set, the bias is initialized zero. Default: None.
gate_activation (function|None): The activation function for gates (actGate).
Default: 'fluid.layers.sigmoid'
activation (function|None): The activation function for cell (actNode).
Default: 'fluid.layers.tanh'
dtype(string): data type used in this unit
Examples:
.. code-block:: python
import paddle.fluid.layers as layers
from paddle.fluid.contrib.layers import BasicGRUUnit
input_size = 128
hidden_size = 256
input = layers.data( name = "input", shape = [-1, input_size], dtype='float32')
pre_hidden = layers.data( name = "pre_hidden", shape=[-1, hidden_size], dtype='float32')
gru_unit = BasicGRUUnit( "gru_unit", hidden_size )
new_hidden = gru_unit( input, pre_hidden )
"""
def __init__(
self,
name_scope,
hidden_size,
param_attr=None,
bias_attr=None,
gate_activation=None,
activation=None,
dtype='float32',
):
super().__init__(name_scope, dtype)
# reserve old school _full_name and _helper for static graph save load
self._full_name = unique_name.generate(
name_scope + "/" + self.__class__.__name__
)
self._helper = LayerObjectHelper(self._full_name)
self._name = name_scope
self._hiden_size = hidden_size
self._param_attr = param_attr
self._bias_attr = bias_attr
self._gate_activation = gate_activation or paddle.nn.functional.sigmoid
self._activation = activation or paddle.tanh
self._dtype = dtype
def _build_once(self, input, pre_hidden):
self._input_size = input.shape[-1]
assert self._input_size > 0
if self._param_attr is not None and self._param_attr.name is not None:
gate_param_attr = copy.deepcopy(self._param_attr)
candidate_param_attr = copy.deepcopy(self._param_attr)
gate_param_attr.name += "_gate"
candidate_param_attr.name += "_candidate"
else:
gate_param_attr = self._param_attr
candidate_param_attr = self._param_attr
self._gate_weight = self.create_parameter(
attr=gate_param_attr,
shape=[self._input_size + self._hiden_size, 2 * self._hiden_size],
dtype=self._dtype,
)
self._candidate_weight = self.create_parameter(
attr=candidate_param_attr,
shape=[self._input_size + self._hiden_size, self._hiden_size],
dtype=self._dtype,
)
if self._bias_attr is not None and self._bias_attr.name is not None:
gate_bias_attr = copy.deepcopy(self._bias_attr)
candidate_bias_attr = copy.deepcopy(self._bias_attr)
gate_bias_attr.name += "_gate"
candidate_bias_attr.name += "_candidate"
else:
gate_bias_attr = self._bias_attr
candidate_bias_attr = self._bias_attr
self._gate_bias = self.create_parameter(
attr=gate_bias_attr,
shape=[2 * self._hiden_size],
dtype=self._dtype,
is_bias=True,
)
self._candidate_bias = self.create_parameter(
attr=candidate_bias_attr,
shape=[self._hiden_size],
dtype=self._dtype,
is_bias=True,
)
def forward(self, input, pre_hidden):
concat_input_hidden = layers.concat([input, pre_hidden], 1)
gate_input = paddle.matmul(x=concat_input_hidden, y=self._gate_weight)
gate_input = paddle.add(gate_input, self._gate_bias)
gate_input = self._gate_activation(gate_input)
r, u = paddle.split(gate_input, num_or_sections=2, axis=1)
r_hidden = r * pre_hidden
candidate = paddle.matmul(
layers.concat([input, r_hidden], 1), self._candidate_weight
)
candidate = paddle.add(candidate, self._candidate_bias)
c = self._activation(candidate)
new_hidden = u * pre_hidden + (1 - u) * c
return new_hidden
def basic_gru(
input,
init_hidden,
hidden_size,
num_layers=1,
sequence_length=None,
dropout_prob=0.0,
bidirectional=False,
batch_first=True,
param_attr=None,
bias_attr=None,
gate_activation=None,
activation=None,
dtype='float32',
name='basic_gru',
):
r"""
GRU implementation using basic operator, supports multiple layers and bidirectional gru.
.. math::
u_t & = actGate(W_ux xu_{t} + W_uh h_{t-1} + b_u)
r_t & = actGate(W_rx xr_{t} + W_rh h_{t-1} + b_r)
m_t & = actNode(W_cx xm_t + W_ch dot(r_t, h_{t-1}) + b_m)
h_t & = dot(u_t, h_{t-1}) + dot((1-u_t), m_t)
Args:
input (Variable): GRU input tensor,
if batch_first = False, shape should be ( seq_len x batch_size x input_size )
if batch_first = True, shape should be ( batch_size x seq_len x hidden_size )
init_hidden(Variable|None): The initial hidden state of the GRU
This is a tensor with shape ( num_layers x batch_size x hidden_size)
if is_bidirec = True, shape should be ( num_layers*2 x batch_size x hidden_size)
and can be reshaped to tensor with ( num_layers x 2 x batch_size x hidden_size) to use.
If it's None, it will be set to all 0.
hidden_size (int): Hidden size of the GRU
num_layers (int): The total number of layers of the GRU
sequence_length (Variabe|None): A Tensor (shape [batch_size]) stores each real length of each instance,
This tensor will be convert to a mask to mask the padding ids
If it's None means NO padding ids
dropout_prob(float|0.0): Dropout prob, dropout ONLY works after rnn output of each layers,
NOT between time steps
bidirectional (bool|False): If it is bidirectional
batch_first (bool|True): The shape format of the input and output tensors. If true,
the shape format should be :attr:`[batch_size, seq_len, hidden_size]`. If false,
the shape format should be :attr:`[seq_len, batch_size, hidden_size]`. By default
this function accepts input and emits output in batch-major form to be consistent
with most of data format, though a bit less efficient because of extra transposes.
param_attr(ParamAttr|None): The parameter attribute for the learnable
weight matrix. Note:
If it is set to None or one attribute of ParamAttr, gru_unit will
create ParamAttr as param_attr. If the Initializer of the param_attr
is not set, the parameter is initialized with Xavier. Default: None.
bias_attr (ParamAttr|None): The parameter attribute for the bias
of GRU unit.
If it is set to None or one attribute of ParamAttr, gru_unit will
create ParamAttr as bias_attr. If the Initializer of the bias_attr
is not set, the bias is initialized zero. Default: None.
gate_activation (function|None): The activation function for gates (actGate).
Default: 'fluid.layers.sigmoid'
activation (function|None): The activation function for cell (actNode).
Default: 'fluid.layers.tanh'
dtype(string): data type used in this unit
name(string): name used to identify parameters and biases
Returns:
rnn_out(Tensor),last_hidden(Tensor)
- rnn_out is result of GRU hidden, with shape (seq_len x batch_size x hidden_size) \
if is_bidirec set to True, shape will be ( seq_len x batch_sze x hidden_size*2)
- last_hidden is the hidden state of the last step of GRU \
shape is ( num_layers x batch_size x hidden_size ) \
if is_bidirec set to True, shape will be ( num_layers*2 x batch_size x hidden_size),
can be reshaped to a tensor with shape( num_layers x 2 x batch_size x hidden_size)
Examples:
.. code-block:: python
import paddle.fluid.layers as layers
from paddle.fluid.contrib.layers import basic_gru
batch_size = 20
input_size = 128
hidden_size = 256
num_layers = 2
dropout = 0.5
bidirectional = True
batch_first = False
input = layers.data( name = "input", shape = [-1, batch_size, input_size], dtype='float32')
pre_hidden = layers.data( name = "pre_hidden", shape=[-1, hidden_size], dtype='float32')
sequence_length = layers.data( name="sequence_length", shape=[-1], dtype='int32')
rnn_out, last_hidden = basic_gru( input, pre_hidden, hidden_size, num_layers = num_layers, \
sequence_length = sequence_length, dropout_prob=dropout, bidirectional = bidirectional, \
batch_first = batch_first)
"""
fw_unit_list = []
for i in range(num_layers):
new_name = name + "_layers_" + str(i)
if param_attr is not None and param_attr.name is not None:
layer_param_attr = copy.deepcopy(param_attr)
layer_param_attr.name += "_fw_w_" + str(i)
else:
layer_param_attr = param_attr
if bias_attr is not None and bias_attr.name is not None:
layer_bias_attr = copy.deepcopy(bias_attr)
layer_bias_attr.name += "_fw_b_" + str(i)
else:
layer_bias_attr = bias_attr
fw_unit_list.append(
BasicGRUUnit(
new_name,
hidden_size,
layer_param_attr,
layer_bias_attr,
gate_activation,
activation,
dtype,
)
)
if bidirectional:
bw_unit_list = []
for i in range(num_layers):
new_name = name + "_reverse_layers_" + str(i)
if param_attr is not None and param_attr.name is not None:
layer_param_attr = copy.deepcopy(param_attr)
layer_param_attr.name += "_bw_w_" + str(i)
else:
layer_param_attr = param_attr
if bias_attr is not None and bias_attr.name is not None:
layer_bias_attr = copy.deepcopy(bias_attr)
layer_bias_attr.name += "_bw_b_" + str(i)
else:
layer_bias_attr = bias_attr
bw_unit_list.append(
BasicGRUUnit(
new_name,
hidden_size,
layer_param_attr,
layer_bias_attr,
gate_activation,
activation,
dtype,
)
)
if batch_first:
input = paddle.transpose(input, [1, 0, 2])
mask = None
if sequence_length:
max_seq_len = paddle.shape(input)[0]
mask = layers.sequence_mask(
sequence_length, maxlen=max_seq_len, dtype='float32'
)
mask = paddle.transpose(mask, [1, 0])
direc_num = 1
if bidirectional:
direc_num = 2
if init_hidden:
init_hidden = paddle.reshape(
init_hidden, shape=[num_layers, direc_num, -1, hidden_size]
)
def get_single_direction_output(
rnn_input, unit_list, mask=None, direc_index=0
):
rnn = StaticRNN()
with rnn.step():
step_input = rnn.step_input(rnn_input)
if mask:
step_mask = rnn.step_input(mask)
for i in range(num_layers):
if init_hidden:
pre_hidden = rnn.memory(init=init_hidden[i, direc_index])
else:
pre_hidden = rnn.memory(
batch_ref=rnn_input,
shape=[-1, hidden_size],
ref_batch_dim_idx=1,
)
new_hidden = unit_list[i](step_input, pre_hidden)
if mask:
new_hidden = paddle.tensor.math._multiply_with_axis(
new_hidden, step_mask, axis=0
) - paddle.tensor.math._multiply_with_axis(
pre_hidden, (step_mask - 1), axis=0
)
rnn.update_memory(pre_hidden, new_hidden)
rnn.step_output(new_hidden)
step_input = new_hidden
if dropout_prob is not None and dropout_prob > 0.0:
step_input = paddle.nn.functional.dropout(
step_input,
p=dropout_prob,
)
rnn.step_output(step_input)
rnn_out = rnn()
last_hidden_array = []
rnn_output = rnn_out[-1]
for i in range(num_layers):
last_hidden = rnn_out[i]
last_hidden = last_hidden[-1]
last_hidden_array.append(last_hidden)
last_hidden_output = layers.concat(last_hidden_array, axis=0)
last_hidden_output = paddle.reshape(
last_hidden_output, shape=[num_layers, -1, hidden_size]
)
return rnn_output, last_hidden_output
# seq_len, batch_size, hidden_size
fw_rnn_out, fw_last_hidden = get_single_direction_output(
input, fw_unit_list, mask, direc_index=0
)
if bidirectional:
bw_input = paddle.reverse(input, axis=[0])
bw_mask = None
if mask:
bw_mask = paddle.reverse(mask, axis=[0])
bw_rnn_out, bw_last_hidden = get_single_direction_output(
bw_input, bw_unit_list, bw_mask, direc_index=1
)
bw_rnn_out = paddle.reverse(bw_rnn_out, axis=[0])
rnn_out = layers.concat([fw_rnn_out, bw_rnn_out], axis=2)
last_hidden = layers.concat([fw_last_hidden, bw_last_hidden], axis=1)
last_hidden = paddle.reshape(
last_hidden, shape=[num_layers * direc_num, -1, hidden_size]
)
if batch_first:
rnn_out = paddle.transpose(rnn_out, [1, 0, 2])
return rnn_out, last_hidden
else:
rnn_out = fw_rnn_out
last_hidden = fw_last_hidden
if batch_first:
rnn_out = paddle.transpose(rnn_out, [1, 0, 2])
return rnn_out, last_hidden
def basic_lstm(
input,
init_hidden,
init_cell,
hidden_size,
num_layers=1,
sequence_length=None,
dropout_prob=0.0,
bidirectional=False,
batch_first=True,
param_attr=None,
bias_attr=None,
gate_activation=None,
activation=None,
forget_bias=1.0,
dtype='float32',
name='basic_lstm',
):
r"""
LSTM implementation using basic operators, supports multiple layers and bidirectional LSTM.
.. math::
i_t &= \sigma(W_{ix}x_{t} + W_{ih}h_{t-1} + b_i)
f_t &= \sigma(W_{fx}x_{t} + W_{fh}h_{t-1} + b_f + forget_bias )
o_t &= \sigma(W_{ox}x_{t} + W_{oh}h_{t-1} + b_o)
\\tilde{c_t} &= tanh(W_{cx}x_t + W_{ch}h_{t-1} + b_c)
c_t &= f_t \odot c_{t-1} + i_t \odot \\tilde{c_t}
h_t &= o_t \odot tanh(c_t)
Args:
input (Variable): lstm input tensor,
if batch_first = False, shape should be ( seq_len x batch_size x input_size )
if batch_first = True, shape should be ( batch_size x seq_len x hidden_size )
init_hidden(Variable|None): The initial hidden state of the LSTM
This is a tensor with shape ( num_layers x batch_size x hidden_size)
if is_bidirec = True, shape should be ( num_layers*2 x batch_size x hidden_size)
and can be reshaped to a tensor with shape ( num_layers x 2 x batch_size x hidden_size) to use.
If it's None, it will be set to all 0.
init_cell(Variable|None): The initial hidden state of the LSTM
This is a tensor with shape ( num_layers x batch_size x hidden_size)
if is_bidirec = True, shape should be ( num_layers*2 x batch_size x hidden_size)
and can be reshaped to a tensor with shape ( num_layers x 2 x batch_size x hidden_size) to use.
If it's None, it will be set to all 0.
hidden_size (int): Hidden size of the LSTM
num_layers (int): The total number of layers of the LSTM
sequence_length (Variabe|None): A tensor (shape [batch_size]) stores each real length of each instance,
This tensor will be convert to a mask to mask the padding ids
If it's None means NO padding ids
dropout_prob(float|0.0): Dropout prob, dropout ONLY work after rnn output of each layers,
NOT between time steps
bidirectional (bool|False): If it is bidirectional
batch_first (bool|True): The shape format of the input and output tensors. If true,
the shape format should be :attr:`[batch_size, seq_len, hidden_size]`. If false,
the shape format should be :attr:`[seq_len, batch_size, hidden_size]`. By default
this function accepts input and emits output in batch-major form to be consistent
with most of data format, though a bit less efficient because of extra transposes.
param_attr(ParamAttr|None): The parameter attribute for the learnable
weight matrix. Note:
If it is set to None or one attribute of ParamAttr, lstm_unit will
create ParamAttr as param_attr. If the Initializer of the param_attr
is not set, the parameter is initialized with Xavier. Default: None.
bias_attr (ParamAttr|None): The parameter attribute for the bias
of LSTM unit.
If it is set to None or one attribute of ParamAttr, lstm_unit will
create ParamAttr as bias_attr. If the Initializer of the bias_attr
is not set, the bias is initialized zero. Default: None.
gate_activation (function|None): The activation function for gates (actGate).
Default: 'fluid.layers.sigmoid'
activation (function|None): The activation function for cell (actNode).
Default: 'fluid.layers.tanh'
forget_bias (float|1.0) : Forget bias used to compute the forget gate
dtype(string): Data type used in this unit
name(string): Name used to identify parameters and biases
Returns:
rnn_out(Tensor), last_hidden(Tensor), last_cell(Tensor)
- rnn_out is the result of LSTM hidden, shape is (seq_len x batch_size x hidden_size) \
if is_bidirec set to True, it's shape will be ( seq_len x batch_sze x hidden_size*2)
- last_hidden is the hidden state of the last step of LSTM \
with shape ( num_layers x batch_size x hidden_size ) \
if is_bidirec set to True, it's shape will be ( num_layers*2 x batch_size x hidden_size),
and can be reshaped to a tensor ( num_layers x 2 x batch_size x hidden_size) to use.
- last_cell is the hidden state of the last step of LSTM \
with shape ( num_layers x batch_size x hidden_size ) \
if is_bidirec set to True, it's shape will be ( num_layers*2 x batch_size x hidden_size),
and can be reshaped to a tensor ( num_layers x 2 x batch_size x hidden_size) to use.
Examples:
.. code-block:: python
import paddle.fluid.layers as layers
from paddle.fluid.contrib.layers import basic_lstm
batch_size = 20
input_size = 128
hidden_size = 256
num_layers = 2
dropout = 0.5
bidirectional = True
batch_first = False
input = layers.data( name = "input", shape = [-1, batch_size, input_size], dtype='float32')
pre_hidden = layers.data( name = "pre_hidden", shape=[-1, hidden_size], dtype='float32')
pre_cell = layers.data( name = "pre_cell", shape=[-1, hidden_size], dtype='float32')
sequence_length = layers.data( name="sequence_length", shape=[-1], dtype='int32')
rnn_out, last_hidden, last_cell = basic_lstm( input, pre_hidden, pre_cell, \
hidden_size, num_layers = num_layers, \
sequence_length = sequence_length, dropout_prob=dropout, bidirectional = bidirectional, \
batch_first = batch_first)
"""
fw_unit_list = []
for i in range(num_layers):
new_name = name + "_layers_" + str(i)
if param_attr is not None and param_attr.name is not None:
layer_param_attr = copy.deepcopy(param_attr)
layer_param_attr.name += "_fw_w_" + str(i)
else:
layer_param_attr = param_attr
if bias_attr is not None and bias_attr.name is not None:
layer_bias_attr = copy.deepcopy(bias_attr)
layer_bias_attr.name += "_fw_b_" + str(i)
else:
layer_bias_attr = bias_attr
fw_unit_list.append(
BasicLSTMUnit(
new_name,
hidden_size,
param_attr=layer_param_attr,
bias_attr=layer_bias_attr,
gate_activation=gate_activation,
activation=activation,
forget_bias=forget_bias,
dtype=dtype,
)
)
if bidirectional:
bw_unit_list = []
for i in range(num_layers):
new_name = name + "_reverse_layers_" + str(i)
if param_attr is not None and param_attr.name is not None:
layer_param_attr = copy.deepcopy(param_attr)
layer_param_attr.name += "_bw_w_" + str(i)
else:
layer_param_attr = param_attr
if bias_attr is not None and bias_attr.name is not None:
layer_bias_attr = copy.deepcopy(bias_attr)
layer_bias_attr.name += "_bw_b_" + str(i)
else:
layer_bias_attr = param_attr
bw_unit_list.append(
BasicLSTMUnit(
new_name,
hidden_size,
param_attr=layer_param_attr,
bias_attr=layer_bias_attr,
gate_activation=gate_activation,
activation=activation,
forget_bias=forget_bias,
dtype=dtype,
)
)
if batch_first:
input = paddle.transpose(input, [1, 0, 2])
mask = None
if sequence_length:
max_seq_len = paddle.shape(input)[0]
mask = layers.sequence_mask(
sequence_length, maxlen=max_seq_len, dtype='float32'
)
mask = paddle.transpose(mask, [1, 0])
direc_num = 1
if bidirectional:
direc_num = 2
# convert to [num_layers, 2, batch_size, hidden_size]
if init_hidden:
init_hidden = paddle.reshape(
init_hidden, shape=[num_layers, direc_num, -1, hidden_size]
)
init_cell = paddle.reshape(
init_cell, shape=[num_layers, direc_num, -1, hidden_size]
)
# forward direction
def get_single_direction_output(
rnn_input, unit_list, mask=None, direc_index=0
):
rnn = StaticRNN()
with rnn.step():
step_input = rnn.step_input(rnn_input)
if mask:
step_mask = rnn.step_input(mask)
for i in range(num_layers):
if init_hidden:
pre_hidden = rnn.memory(init=init_hidden[i, direc_index])
pre_cell = rnn.memory(init=init_cell[i, direc_index])
else:
pre_hidden = rnn.memory(
batch_ref=rnn_input, shape=[-1, hidden_size]
)
pre_cell = rnn.memory(
batch_ref=rnn_input, shape=[-1, hidden_size]
)
new_hidden, new_cell = unit_list[i](
step_input, pre_hidden, pre_cell
)
if mask:
new_hidden = paddle.tensor.math._multiply_with_axis(
new_hidden, step_mask, axis=0
) - paddle.tensor.math._multiply_with_axis(
pre_hidden, (step_mask - 1), axis=0
)
new_cell = paddle.tensor.math._multiply_with_axis(
new_cell, step_mask, axis=0
) - paddle.tensor.math._multiply_with_axis(
pre_cell, (step_mask - 1), axis=0
)
rnn.update_memory(pre_hidden, new_hidden)
rnn.update_memory(pre_cell, new_cell)
rnn.step_output(new_hidden)
rnn.step_output(new_cell)
step_input = new_hidden
if dropout_prob is not None and dropout_prob > 0.0:
step_input = paddle.nn.functional.dropout(
step_input,
p=dropout_prob,
mode='upscale_in_train',
)
rnn.step_output(step_input)
rnn_out = rnn()
last_hidden_array = []
last_cell_array = []
rnn_output = rnn_out[-1]
for i in range(num_layers):
last_hidden = rnn_out[i * 2]
last_hidden = last_hidden[-1]
last_hidden_array.append(last_hidden)
last_cell = rnn_out[i * 2 + 1]
last_cell = last_cell[-1]
last_cell_array.append(last_cell)
last_hidden_output = layers.concat(last_hidden_array, axis=0)
last_hidden_output = paddle.reshape(
last_hidden_output, shape=[num_layers, -1, hidden_size]
)
last_cell_output = layers.concat(last_cell_array, axis=0)
last_cell_output = paddle.reshape(
last_cell_output, shape=[num_layers, -1, hidden_size]
)
return rnn_output, last_hidden_output, last_cell_output
# seq_len, batch_size, hidden_size
fw_rnn_out, fw_last_hidden, fw_last_cell = get_single_direction_output(
input, fw_unit_list, mask, direc_index=0
)
if bidirectional:
bw_input = paddle.reverse(input, axis=[0])
bw_mask = None
if mask:
bw_mask = paddle.reverse(mask, axis=[0])
bw_rnn_out, bw_last_hidden, bw_last_cell = get_single_direction_output(
bw_input, bw_unit_list, bw_mask, direc_index=1
)
bw_rnn_out = paddle.reverse(bw_rnn_out, axis=[0])
rnn_out = layers.concat([fw_rnn_out, bw_rnn_out], axis=2)
last_hidden = layers.concat([fw_last_hidden, bw_last_hidden], axis=1)
last_hidden = paddle.reshape(
last_hidden, shape=[num_layers * direc_num, -1, hidden_size]
)
last_cell = layers.concat([fw_last_cell, bw_last_cell], axis=1)
last_cell = paddle.reshape(
last_cell, shape=[num_layers * direc_num, -1, hidden_size]
)
if batch_first:
rnn_out = paddle.transpose(rnn_out, [1, 0, 2])
return rnn_out, last_hidden, last_cell
else:
rnn_out = fw_rnn_out
last_hidden = fw_last_hidden
last_cell = fw_last_cell
if batch_first:
rnn_out = paddle.transpose(rnn_out, [1, 0, 2])
return rnn_out, last_hidden, last_cell
class BasicLSTMUnit(Layer):
r"""
****
BasicLSTMUnit class, Using basic operator to build LSTM
The algorithm can be described as the code below.
.. math::
i_t &= \sigma(W_{ix}x_{t} + W_{ih}h_{t-1} + b_i)
f_t &= \sigma(W_{fx}x_{t} + W_{fh}h_{t-1} + b_f + forget_bias )
o_t &= \sigma(W_{ox}x_{t} + W_{oh}h_{t-1} + b_o)
\\tilde{c_t} &= tanh(W_{cx}x_t + W_{ch}h_{t-1} + b_c)
c_t &= f_t \odot c_{t-1} + i_t \odot \\tilde{c_t}
h_t &= o_t \odot tanh(c_t)
- $W$ terms denote weight matrices (e.g. $W_{ix}$ is the matrix
of weights from the input gate to the input)
- The b terms denote bias vectors ($bx_i$ and $bh_i$ are the input gate bias vector).
- sigmoid is the logistic sigmoid function.
- $i, f, o$ and $c$ are the input gate, forget gate, output gate,
and cell activation vectors, respectively, all of which have the same size as
the cell output activation vector $h$.
- The :math:`\odot` is the element-wise product of the vectors.
- :math:`tanh` is the activation functions.
- :math:`\\tilde{c_t}` is also called candidate hidden state,
which is computed based on the current input and the previous hidden state.
Args:
name_scope(string) : The name scope used to identify parameter and bias name
hidden_size (integer): The hidden size used in the Unit.
param_attr(ParamAttr|None): The parameter attribute for the learnable
weight matrix. Note:
If it is set to None or one attribute of ParamAttr, lstm_unit will
create ParamAttr as param_attr. If the Initializer of the param_attr
is not set, the parameter is initialized with Xavier. Default: None.
bias_attr (ParamAttr|None): The parameter attribute for the bias
of LSTM unit.
If it is set to None or one attribute of ParamAttr, lstm_unit will
create ParamAttr as bias_attr. If the Initializer of the bias_attr
is not set, the bias is initialized as zero. Default: None.
gate_activation (function|None): The activation function for gates (actGate).
Default: 'fluid.layers.sigmoid'
activation (function|None): The activation function for cells (actNode).
Default: 'fluid.layers.tanh'
forget_bias(float|1.0): forget bias used when computing forget gate
dtype(string): data type used in this unit
Examples:
.. code-block:: python
import paddle.fluid.layers as layers
from paddle.fluid.contrib.layers import BasicLSTMUnit
input_size = 128
hidden_size = 256
input = layers.data( name = "input", shape = [-1, input_size], dtype='float32')
pre_hidden = layers.data( name = "pre_hidden", shape=[-1, hidden_size], dtype='float32')
pre_cell = layers.data( name = "pre_cell", shape=[-1, hidden_size], dtype='float32')
lstm_unit = BasicLSTMUnit( "gru_unit", hidden_size)
new_hidden, new_cell = lstm_unit( input, pre_hidden, pre_cell )
"""
def __init__(
self,
name_scope,
hidden_size,
param_attr=None,
bias_attr=None,
gate_activation=None,
activation=None,
forget_bias=1.0,
dtype='float32',
):
super().__init__(name_scope, dtype)
# reserve old school _full_name and _helper for static graph save load
self._full_name = unique_name.generate(
name_scope + "/" + self.__class__.__name__
)
self._helper = LayerObjectHelper(self._full_name)
self._name = name_scope
self._hiden_size = hidden_size
self._param_attr = param_attr
self._bias_attr = bias_attr
self._gate_activation = gate_activation or paddle.nn.functional.sigmoid
self._activation = activation or paddle.tanh
self._forget_bias = layers.fill_constant(
[1], dtype=dtype, value=forget_bias
)
self._forget_bias.stop_gradient = False
self._dtype = dtype
def _build_once(self, input, pre_hidden, pre_cell):
self._input_size = input.shape[-1]
assert self._input_size > 0
self._weight = self.create_parameter(
attr=self._param_attr,
shape=[self._input_size + self._hiden_size, 4 * self._hiden_size],
dtype=self._dtype,
)
self._bias = self.create_parameter(
attr=self._bias_attr,
shape=[4 * self._hiden_size],
dtype=self._dtype,
is_bias=True,
)
def forward(self, input, pre_hidden, pre_cell):
concat_input_hidden = layers.concat([input, pre_hidden], 1)
gate_input = paddle.matmul(x=concat_input_hidden, y=self._weight)
gate_input = paddle.add(gate_input, self._bias)
i, j, f, o = paddle.split(gate_input, num_or_sections=4, axis=-1)
new_cell = paddle.add(
paddle.multiply(
pre_cell,
paddle.nn.functional.sigmoid(paddle.add(f, self._forget_bias)),
),
paddle.multiply(paddle.nn.functional.sigmoid(i), paddle.tanh(j)),
)
new_hidden = paddle.tanh(new_cell) * paddle.nn.functional.sigmoid(o)
return new_hidden, new_cell
......@@ -444,10 +444,6 @@ list(REMOVE_ITEM TEST_OPS
list(REMOVE_ITEM TEST_OPS test_imperative_ocr_attention_model)
list(REMOVE_ITEM TEST_OPS test_async_ssa_graph_executor_mnist)
list(REMOVE_ITEM TEST_OPS test_install_check)
list(REMOVE_ITEM TEST_OPS test_basic_gru_api)
list(REMOVE_ITEM TEST_OPS test_basic_gru_unit_op)
list(REMOVE_ITEM TEST_OPS test_basic_lstm_api)
list(REMOVE_ITEM TEST_OPS test_basic_lstm_unit_op)
list(REMOVE_ITEM TEST_OPS test_fuse_all_reduce_pass)
list(REMOVE_ITEM TEST_OPS test_fuse_bn_act_pass)
list(REMOVE_ITEM TEST_OPS test_fuse_bn_add_act_pass)
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy
import numpy as np
import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.layers as layers
from paddle.fluid import framework
from paddle.fluid.contrib.layers import basic_gru
from paddle.fluid.executor import Executor
np.set_seed(123)
SIGMOID_THRESHOLD_MIN = -40.0
SIGMOID_THRESHOLD_MAX = 13.0
EXP_MAX_INPUT = 40.0
def sigmoid(x):
y = np.copy(x)
y[x < SIGMOID_THRESHOLD_MIN] = SIGMOID_THRESHOLD_MIN
y[x > SIGMOID_THRESHOLD_MAX] = SIGMOID_THRESHOLD_MAX
return 1.0 / (1.0 + np.exp(-y))
def tanh(x):
y = -2.0 * x
y[y > EXP_MAX_INPUT] = EXP_MAX_INPUT
return (2.0 / (1.0 + np.exp(y))) - 1.0
def gru_np(
input,
init_h,
hidden_size,
gate_weight,
gate_bias,
candidate_weight,
candidate_bias,
num_layers=1,
batch_first=False,
is_bidirect=False,
sequence_length=None,
):
def step(step_in, pre_hidden, gate_w, gate_b, candidate_w, candidate_b):
concat_1 = np.concatenate([step_in, pre_hidden], 1)
gate_input = np.matmul(concat_1, gate_w)
gate_input += gate_b
gate_input = sigmoid(gate_input)
r, u = np.split(gate_input, indices_or_sections=2, axis=1)
r_hidden = r * pre_hidden
candidate = np.matmul(
np.concatenate([step_in, r_hidden], 1), candidate_w
)
candidate += candidate_b
c = tanh(candidate)
new_hidden = u * pre_hidden + (1 - u) * c
return new_hidden
if batch_first:
input = np.tranpose(input, [1, 0, 2])
batch_size = input.shape[1]
mask = None
if sequence_length is not None:
max_seq_len = input.shape[0]
mask = np.zeros([batch_size, max_seq_len])
for i, len in enumerate(sequence_length):
mask[i, :len] = 1.0
mask = np.transpose(mask, [1, 0])
direc_num = 1
if is_bidirect:
direc_num = 2
if init_h:
init_h = np.reshape(
init_h, shape=[num_layers, direc_num, -1, hidden_size]
)
else:
init_h = np.zeros([num_layers, direc_num, batch_size, hidden_size])
def get_single_direction_output(rnn_input, mask=None, direc_index=0):
seq_len = rnn_input.shape[0]
output = []
# init pre hidden
pre_hidden_array = []
for i in range(num_layers):
pre_hidden_array.append(init_h[i, direc_index])
for i in range(seq_len):
step_input = rnn_input[i]
if mask is not None:
step_mask = mask[i]
step_mask = np.reshape(step_mask, [-1, 1])
for i in range(num_layers):
new_hidden = step(
step_input,
pre_hidden_array[i],
gate_weight[direc_index * num_layers + i],
gate_bias[direc_index * num_layers + i],
candidate_weight[direc_index * num_layers + i],
candidate_bias[direc_index * num_layers + i],
)
if mask is not None:
new_hidden = (
new_hidden * step_mask
+ (1 - step_mask) * pre_hidden_array[i]
)
pre_hidden_array[i] = new_hidden
step_input = new_hidden
output.append(step_input)
rnn_out = np.concatenate(output, 0)
rnn_out = np.reshape(rnn_out, [seq_len, -1, hidden_size])
last_hidden_out = np.concatenate(pre_hidden_array, 0)
last_hidden_out = np.reshape(
last_hidden_out, [num_layers, -1, hidden_size]
)
return rnn_out, last_hidden_out
fw_rnn_out, fw_last_hidden = get_single_direction_output(
input, mask, direc_index=0
)
if is_bidirect:
bw_input = input[::-1]
bw_mask = None
if mask is not None:
bw_mask = mask[::-1]
bw_rnn_out, bw_last_hidden = get_single_direction_output(
bw_input, bw_mask, direc_index=1
)
bw_rnn_out = bw_rnn_out[::-1]
rnn_out = np.concatenate([fw_rnn_out, bw_rnn_out], 2)
last_hidden = np.concatenate([fw_last_hidden, bw_last_hidden], 1)
last_hidden = np.reshape(
last_hidden, [num_layers * direc_num, -1, hidden_size]
)
if batch_first:
rnn_out = np.transpose(rnn_out, [1, 0, 2])
return rnn_out, last_hidden
else:
rnn_out = fw_rnn_out
last_hidden = fw_last_hidden
if batch_first:
rnn_out = np.transpose(rnn_out, [1, 0, 2])
return rnn_out, last_hidden
class TestBasicGRUApi(unittest.TestCase):
def setUp(self):
self.hidden_size = 10
self.batch_size = 5
self.seq_len = 6
self.num_layers = 2
self.is_bidirect = True
self.batch_first = False
def test_run(self):
x = layers.data(
name='x',
shape=[-1, self.batch_size, self.hidden_size],
dtype='float32',
)
sequence_length = layers.data(
name="sequence_length", shape=[-1], dtype='float32'
)
rnn_out, last_hidden = basic_gru(
x,
None,
self.hidden_size,
num_layers=self.num_layers,
batch_first=self.batch_first,
bidirectional=self.is_bidirect,
sequence_length=sequence_length,
)
last_hidden.persisbale = True
rnn_out.persisbale = True
if core.is_compiled_with_cuda():
place = core.CUDAPlace(0)
else:
place = core.CPUPlace()
exe = Executor(place)
exe.run(framework.default_startup_program())
param_list = fluid.default_main_program().block(0).all_parameters()
# process weight and bias
gate_weight = []
gate_bias = []
candidate_weight = []
candidate_bias = []
for i in range(self.num_layers):
gate_w_name = "basic_gru_layers_" + str(i) + "/BasicGRUUnit_0.w_0"
gate_b_name = "basic_gru_layers_" + str(i) + "/BasicGRUUnit_0.b_0"
candidate_w_name = (
"basic_gru_layers_" + str(i) + "/BasicGRUUnit_0.w_1"
)
candidate_b_name = (
"basic_gru_layers_" + str(i) + "/BasicGRUUnit_0.b_1"
)
gate_w = np.array(
fluid.global_scope().find_var(gate_w_name).get_tensor()
)
gate_w = np.random.uniform(-0.1, 0.1, size=gate_w.shape).astype(
'float32'
)
fluid.global_scope().find_var(gate_w_name).get_tensor().set(
gate_w, place
)
gate_b = np.array(
fluid.global_scope().find_var(gate_b_name).get_tensor()
)
gate_b = np.random.uniform(-0.1, 0.1, size=gate_b.shape).astype(
'float32'
)
fluid.global_scope().find_var(gate_b_name).get_tensor().set(
gate_b, place
)
candidate_w = np.array(
fluid.global_scope().find_var(candidate_w_name).get_tensor()
)
candidate_w = np.random.uniform(
-0.1, 0.1, size=candidate_w.shape
).astype('float32')
fluid.global_scope().find_var(candidate_w_name).get_tensor().set(
candidate_w, place
)
candidate_b = np.array(
fluid.global_scope().find_var(candidate_b_name).get_tensor()
)
candidate_b = np.random.uniform(
-0.1, 0.1, size=candidate_b.shape
).astype('float32')
fluid.global_scope().find_var(candidate_b_name).get_tensor().set(
candidate_b, place
)
gate_weight.append(gate_w)
gate_bias.append(gate_b)
candidate_weight.append(candidate_w)
candidate_bias.append(candidate_b)
if self.is_bidirect:
for i in range(self.num_layers):
gate_w_name = (
"basic_gru_reverse_layers_" + str(i) + "/BasicGRUUnit_0.w_0"
)
gate_b_name = (
"basic_gru_reverse_layers_" + str(i) + "/BasicGRUUnit_0.b_0"
)
candidate_w_name = (
"basic_gru_reverse_layers_" + str(i) + "/BasicGRUUnit_0.w_1"
)
candidate_b_name = (
"basic_gru_reverse_layers_" + str(i) + "/BasicGRUUnit_0.b_1"
)
gate_w = np.array(
fluid.global_scope().find_var(gate_w_name).get_tensor()
)
gate_w = np.random.uniform(-0.1, 0.1, size=gate_w.shape).astype(
'float32'
)
fluid.global_scope().find_var(gate_w_name).get_tensor().set(
gate_w, place
)
gate_b = np.array(
fluid.global_scope().find_var(gate_b_name).get_tensor()
)
gate_b = np.random.uniform(-0.1, 0.1, size=gate_b.shape).astype(
'float32'
)
fluid.global_scope().find_var(gate_b_name).get_tensor().set(
gate_b, place
)
candidate_w = np.array(
fluid.global_scope().find_var(candidate_w_name).get_tensor()
)
candidate_w = np.random.uniform(
-0.1, 0.1, size=candidate_w.shape
).astype('float32')
fluid.global_scope().find_var(
candidate_w_name
).get_tensor().set(candidate_w, place)
candidate_b = np.array(
fluid.global_scope().find_var(candidate_b_name).get_tensor()
)
candidate_b = np.random.uniform(
-0.1, 0.1, size=candidate_b.shape
).astype('float32')
fluid.global_scope().find_var(
candidate_b_name
).get_tensor().set(candidate_b, place)
gate_weight.append(gate_w)
gate_bias.append(gate_b)
candidate_weight.append(candidate_w)
candidate_bias.append(candidate_b)
step_input_np = np.random.uniform(
-0.1, 0.1, (self.seq_len, self.batch_size, self.hidden_size)
).astype('float32')
sequence_length_np = np.random.randint(
self.seq_len // 2, self.seq_len, size=(self.batch_size)
).astype('int64')
out = exe.run(
feed={'x': step_input_np, 'sequence_length': sequence_length_np},
fetch_list=[rnn_out, last_hidden],
)
api_rnn_out = out[0]
api_last_hidden = out[1]
np_out = gru_np(
step_input_np,
None,
self.hidden_size,
gate_weight,
gate_bias,
candidate_weight,
candidate_bias,
num_layers=self.num_layers,
batch_first=self.batch_first,
is_bidirect=self.is_bidirect,
sequence_length=sequence_length_np,
)
np.testing.assert_allclose(api_rnn_out, np_out[0], rtol=0.0001, atol=0)
np.testing.assert_allclose(
api_last_hidden, np_out[1], rtol=0.0001, atol=0
)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy
import numpy as np
import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.layers as layers
from paddle.fluid import framework
from paddle.fluid.contrib.layers import BasicGRUUnit
from paddle.fluid.executor import Executor
np.set_seed(123)
SIGMOID_THRESHOLD_MIN = -40.0
SIGMOID_THRESHOLD_MAX = 13.0
EXP_MAX_INPUT = 40.0
def sigmoid(x):
y = np.copy(x)
y[x < SIGMOID_THRESHOLD_MIN] = SIGMOID_THRESHOLD_MIN
y[x > SIGMOID_THRESHOLD_MAX] = SIGMOID_THRESHOLD_MAX
return 1.0 / (1.0 + np.exp(-y))
def tanh(x):
y = -2.0 * x
y[y > EXP_MAX_INPUT] = EXP_MAX_INPUT
return (2.0 / (1.0 + np.exp(y))) - 1.0
def step(step_in, pre_hidden, gate_w, gate_b, candidate_w, candidate_b):
concat_1 = np.concatenate([step_in, pre_hidden], 1)
gate_input = np.matmul(concat_1, gate_w)
gate_input += gate_b
gate_input = sigmoid(gate_input)
r, u = np.split(gate_input, indices_or_sections=2, axis=1)
r_hidden = r * pre_hidden
candidate = np.matmul(np.concatenate([step_in, r_hidden], 1), candidate_w)
candidate += candidate_b
c = tanh(candidate)
new_hidden = u * pre_hidden + (1 - u) * c
return new_hidden
class TestBasicGRUUnit(unittest.TestCase):
def setUp(self):
self.hidden_size = 5
self.batch_size = 5
def test_run(self):
x = layers.data(name='x', shape=[-1, self.hidden_size], dtype='float32')
pre_hidden = layers.data(
name="pre_hidden", shape=[-1, self.hidden_size], dtype='float32'
)
gru_unit = BasicGRUUnit("gru_unit", self.hidden_size)
new_hidden = gru_unit(x, pre_hidden)
new_hidden.persisbale = True
if core.is_compiled_with_cuda():
place = core.CUDAPlace(0)
else:
place = core.CPUPlace()
exe = Executor(place)
exe.run(framework.default_startup_program())
param_list = fluid.default_main_program().block(0).all_parameters()
# process weight and bias
gate_w_name = "gru_unit/BasicGRUUnit_0.w_0"
gate_b_name = "gru_unit/BasicGRUUnit_0.b_0"
candidate_w_name = "gru_unit/BasicGRUUnit_0.w_1"
candidate_b_name = "gru_unit/BasicGRUUnit_0.b_1"
gate_w = np.array(
fluid.global_scope().find_var(gate_w_name).get_tensor()
)
gate_w = np.random.uniform(-0.1, 0.1, size=gate_w.shape).astype(
'float32'
)
fluid.global_scope().find_var(gate_w_name).get_tensor().set(
gate_w, place
)
gate_b = np.array(
fluid.global_scope().find_var(gate_b_name).get_tensor()
)
gate_b = np.random.uniform(-0.1, 0.1, size=gate_b.shape).astype(
'float32'
)
fluid.global_scope().find_var(gate_b_name).get_tensor().set(
gate_b, place
)
candidate_w = np.array(
fluid.global_scope().find_var(candidate_w_name).get_tensor()
)
candidate_w = np.random.uniform(
-0.1, 0.1, size=candidate_w.shape
).astype('float32')
fluid.global_scope().find_var(candidate_w_name).get_tensor().set(
candidate_w, place
)
candidate_b = np.array(
fluid.global_scope().find_var(candidate_b_name).get_tensor()
)
candidate_b = np.random.uniform(
-0.1, 0.1, size=candidate_b.shape
).astype('float32')
fluid.global_scope().find_var(candidate_b_name).get_tensor().set(
candidate_b, place
)
step_input_np = np.random.uniform(
-0.1, 0.1, (self.batch_size, self.hidden_size)
).astype('float32')
pre_hidden_np = np.random.uniform(
-0.1, 0.1, (self.batch_size, self.hidden_size)
).astype('float32')
out = exe.run(
feed={'x': step_input_np, 'pre_hidden': pre_hidden_np},
fetch_list=[new_hidden],
)
api_out = out[0]
np_out = step(
step_input_np,
pre_hidden_np,
gate_w,
gate_b,
candidate_w,
candidate_b,
)
np.testing.assert_allclose(api_out, np_out, rtol=0.0001, atol=0)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy
import numpy as np
import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.layers as layers
from paddle.fluid import framework
from paddle.fluid.contrib.layers import basic_lstm
from paddle.fluid.executor import Executor
np.set_seed(123)
SIGMOID_THRESHOLD_MIN = -40.0
SIGMOID_THRESHOLD_MAX = 13.0
EXP_MAX_INPUT = 40.0
def sigmoid(x):
y = np.copy(x)
y[x < SIGMOID_THRESHOLD_MIN] = SIGMOID_THRESHOLD_MIN
y[x > SIGMOID_THRESHOLD_MAX] = SIGMOID_THRESHOLD_MAX
return 1.0 / (1.0 + np.exp(-y))
def tanh(x):
y = -2.0 * x
y[y > EXP_MAX_INPUT] = EXP_MAX_INPUT
return (2.0 / (1.0 + np.exp(y))) - 1.0
def lstm_np(
input,
init_h,
init_c,
hidden_size,
gate_weight,
gate_bias,
num_layers=1,
batch_first=False,
is_bidirect=False,
sequence_length=None,
forget_bias=1.0,
):
def step(step_in, pre_hidden, pre_cell, gate_w, gate_b):
concat_1 = np.concatenate([step_in, pre_hidden], 1)
gate_input = np.matmul(concat_1, gate_w)
gate_input += gate_b
i, j, f, o = np.split(gate_input, indices_or_sections=4, axis=1)
new_cell = pre_cell * sigmoid(f + forget_bias) + sigmoid(i) * tanh(j)
new_hidden = tanh(new_cell) * sigmoid(o)
return new_hidden, new_cell
mask = None
if batch_first:
input = np.tranpose(input, [1, 0, 2])
if mask is not None:
mask = np.transpose(mask, [1, 0])
batch_size = input.shape[1]
if sequence_length is not None:
max_seq_len = input.shape[0]
mask = np.zeros([batch_size, max_seq_len])
for i, len in enumerate(sequence_length):
mask[i, :len] = 1.0
mask = np.transpose(mask, [1, 0])
direc_num = 1
if is_bidirect:
direc_num = 2
if init_h:
init_h = np.reshape(init_h, [num_layers, direc_num, -1, hidden_size])
init_c = np.reshape(init_c, [num_layers, direc_num, -1, hidden_size])
else:
init_h = np.zeros([num_layers, direc_num, batch_size, hidden_size])
init_c = np.zeros([num_layers, direc_num, batch_size, hidden_size])
def get_single_direction_output(rnn_input, mask=None, direc_index=0):
seq_len = rnn_input.shape[0]
output = []
# init pre hidden
pre_hidden_array = []
pre_cell_array = []
for i in range(num_layers):
pre_hidden_array.append(init_h[i, direc_index])
pre_cell_array.append(init_c[i, direc_index])
for i in range(seq_len):
step_input = rnn_input[i]
if mask is not None:
step_mask = mask[i]
step_mask = np.reshape(step_mask, [-1, 1])
# print("np mask", step_mask.shape )
for i in range(num_layers):
new_hidden, new_cell = step(
step_input,
pre_hidden_array[i],
pre_cell_array[i],
gate_weight[direc_index * num_layers + i],
gate_bias[direc_index * num_layers + i],
)
if mask is not None:
new_hidden = np.multiply(
new_hidden, step_mask
) - np.multiply(pre_hidden_array[i], (step_mask - 1.0))
# new_hidden = new_hidden * step_mask - pre_hidden_array[i] * ( step_mask -1 )
# new_cell = new_cell * step_mask - pre_cell_array[i] * (step_mask -1)
new_cell = np.multiply(new_cell, step_mask) - np.multiply(
pre_cell_array[i], (step_mask - 1.0)
)
pre_hidden_array[i] = new_hidden
pre_cell_array[i] = new_cell
step_input = new_hidden
output.append(step_input)
rnn_out = np.concatenate(output, 0)
rnn_out = np.reshape(rnn_out, [seq_len, -1, hidden_size])
last_hidden_out = np.concatenate(pre_hidden_array, 0)
last_hidden_out = np.reshape(
last_hidden_out, [num_layers, -1, hidden_size]
)
last_cell_out = np.concatenate(pre_cell_array, 0)
last_cell_out = np.reshape(last_cell_out, [num_layers, -1, hidden_size])
return rnn_out, last_hidden_out, last_cell_out
fw_rnn_out, fw_last_hidden, fw_last_cell = get_single_direction_output(
input, mask, direc_index=0
)
if is_bidirect:
bw_input = input[::-1]
bw_mask = None
if mask is not None:
bw_mask = mask[::-1]
bw_rnn_out, bw_last_hidden, bw_last_cell = get_single_direction_output(
bw_input, bw_mask, direc_index=1
)
bw_rnn_out = bw_rnn_out[::-1]
rnn_out = np.concatenate([fw_rnn_out, bw_rnn_out], 2)
last_hidden = np.concatenate([fw_last_hidden, bw_last_hidden], 1)
last_hidden = np.reshape(
last_hidden, [num_layers * direc_num, -1, hidden_size]
)
last_cell = np.concatenate([fw_last_cell, bw_last_cell], 1)
last_cell = np.reshape(
last_cell, [num_layers * direc_num, -1, hidden_size]
)
if batch_first:
rnn_out = np.transpose(rnn_out, [1, 0, 2])
return rnn_out, last_hidden, last_cell
else:
rnn_out = fw_rnn_out
last_hidden = fw_last_hidden
last_cell = fw_last_cell
if batch_first:
rnn_out = np.transpose(rnn_out, [1, 0, 2])
return rnn_out, last_hidden, last_cell
class TestBasicLSTMApi(unittest.TestCase):
def setUp(self):
self.hidden_size = 10
self.batch_size = 5
self.seq_len = 6
self.num_layers = 2
self.is_bidirect = True
self.batch_first = False
self.forget_bias = 1.0
def test_run(self):
x = layers.data(
name='x',
shape=[-1, self.batch_size, self.hidden_size],
dtype='float32',
)
sequence_length = layers.data(
name="sequence_length", shape=[-1], dtype='float32'
)
rnn_out, last_hidden, last_cell = basic_lstm(
x,
None,
None,
self.hidden_size,
num_layers=self.num_layers,
batch_first=self.batch_first,
bidirectional=self.is_bidirect,
sequence_length=sequence_length,
forget_bias=self.forget_bias,
)
last_hidden.persisbale = True
rnn_out.persisbale = True
if core.is_compiled_with_cuda():
place = core.CUDAPlace(0)
else:
place = core.CPUPlace()
exe = Executor(place)
exe.run(framework.default_startup_program())
param_list = fluid.default_main_program().block(0).all_parameters()
# process weight and bias
gate_weight = []
gate_bias = []
for i in range(self.num_layers):
gate_w_name = "basic_lstm_layers_" + str(i) + "/BasicLSTMUnit_0.w_0"
gate_b_name = "basic_lstm_layers_" + str(i) + "/BasicLSTMUnit_0.b_0"
gate_w = np.array(
fluid.global_scope().find_var(gate_w_name).get_tensor()
)
gate_w = np.random.uniform(-0.1, 0.1, size=gate_w.shape).astype(
'float32'
)
fluid.global_scope().find_var(gate_w_name).get_tensor().set(
gate_w, place
)
gate_b = np.array(
fluid.global_scope().find_var(gate_b_name).get_tensor()
)
gate_b = np.random.uniform(-0.1, 0.1, size=gate_b.shape).astype(
'float32'
)
fluid.global_scope().find_var(gate_b_name).get_tensor().set(
gate_b, place
)
gate_weight.append(gate_w)
gate_bias.append(gate_b)
if self.is_bidirect:
for i in range(self.num_layers):
gate_w_name = (
"basic_lstm_reverse_layers_"
+ str(i)
+ "/BasicLSTMUnit_0.w_0"
)
gate_b_name = (
"basic_lstm_reverse_layers_"
+ str(i)
+ "/BasicLSTMUnit_0.b_0"
)
gate_w = np.array(
fluid.global_scope().find_var(gate_w_name).get_tensor()
)
gate_w = np.random.uniform(-0.1, 0.1, size=gate_w.shape).astype(
'float32'
)
fluid.global_scope().find_var(gate_w_name).get_tensor().set(
gate_w, place
)
gate_b = np.array(
fluid.global_scope().find_var(gate_b_name).get_tensor()
)
gate_b = np.random.uniform(-0.1, 0.1, size=gate_b.shape).astype(
'float32'
)
fluid.global_scope().find_var(gate_b_name).get_tensor().set(
gate_b, place
)
gate_weight.append(gate_w)
gate_bias.append(gate_b)
step_input_np = np.random.uniform(
-0.1, 0.1, (self.seq_len, self.batch_size, self.hidden_size)
).astype('float32')
sequence_length_np = np.random.randint(
self.seq_len // 2, self.seq_len, size=(self.batch_size)
).astype('int64')
out = exe.run(
feed={'x': step_input_np, 'sequence_length': sequence_length_np},
fetch_list=[rnn_out, last_hidden, last_cell],
)
api_rnn_out = out[0]
api_last_hidden = out[1]
api_last_cell = out[2]
np_out = lstm_np(
step_input_np,
None,
None,
self.hidden_size,
gate_weight,
gate_bias,
num_layers=self.num_layers,
batch_first=self.batch_first,
is_bidirect=self.is_bidirect,
sequence_length=sequence_length_np,
)
np.testing.assert_allclose(api_rnn_out, np_out[0], rtol=0.0001, atol=0)
np.testing.assert_allclose(
api_last_hidden, np_out[1], rtol=0.0001, atol=0
)
np.testing.assert_allclose(
api_last_cell, np_out[2], rtol=0.0001, atol=0
)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy
import numpy as np
import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.layers as layers
from paddle.fluid import framework
from paddle.fluid.contrib.layers import BasicLSTMUnit
from paddle.fluid.executor import Executor
np.set_seed(123)
SIGMOID_THRESHOLD_MIN = -40.0
SIGMOID_THRESHOLD_MAX = 13.0
EXP_MAX_INPUT = 40.0
def sigmoid(x):
y = np.copy(x)
y[x < SIGMOID_THRESHOLD_MIN] = SIGMOID_THRESHOLD_MIN
y[x > SIGMOID_THRESHOLD_MAX] = SIGMOID_THRESHOLD_MAX
return 1.0 / (1.0 + np.exp(-y))
def tanh(x):
y = -2.0 * x
y[y > EXP_MAX_INPUT] = EXP_MAX_INPUT
return (2.0 / (1.0 + np.exp(y))) - 1.0
def step(step_in, pre_hidden, pre_cell, gate_w, gate_b, forget_bias=1.0):
concat_1 = np.concatenate([step_in, pre_hidden], 1)
gate_input = np.matmul(concat_1, gate_w)
gate_input += gate_b
i, j, f, o = np.split(gate_input, indices_or_sections=4, axis=1)
new_cell = pre_cell * sigmoid(f + forget_bias) + sigmoid(i) * tanh(j)
new_hidden = tanh(new_cell) * sigmoid(o)
return new_hidden, new_cell
class TestBasicGRUUnit(unittest.TestCase):
def setUp(self):
self.hidden_size = 5
self.batch_size = 5
def test_run(self):
x = layers.data(name='x', shape=[-1, self.hidden_size], dtype='float32')
pre_hidden = layers.data(
name="pre_hidden", shape=[-1, self.hidden_size], dtype='float32'
)
pre_cell = layers.data(
name="pre_cell", shape=[-1, self.hidden_size], dtype='float32'
)
lstm_unit = BasicLSTMUnit("lstm_unit", self.hidden_size)
new_hidden, new_cell = lstm_unit(x, pre_hidden, pre_cell)
new_hidden.persisbale = True
new_cell.persisbale = True
if core.is_compiled_with_cuda():
place = core.CUDAPlace(0)
else:
place = core.CPUPlace()
exe = Executor(place)
exe.run(framework.default_startup_program())
param_list = fluid.default_main_program().block(0).all_parameters()
# process weight and bias
gate_w_name = "lstm_unit/BasicLSTMUnit_0.w_0"
gate_b_name = "lstm_unit/BasicLSTMUnit_0.b_0"
gate_w = np.array(
fluid.global_scope().find_var(gate_w_name).get_tensor()
)
gate_w = np.random.uniform(-0.1, 0.1, size=gate_w.shape).astype(
'float32'
)
fluid.global_scope().find_var(gate_w_name).get_tensor().set(
gate_w, place
)
gate_b = np.array(
fluid.global_scope().find_var(gate_b_name).get_tensor()
)
gate_b = np.random.uniform(-0.1, 0.1, size=gate_b.shape).astype(
'float32'
)
fluid.global_scope().find_var(gate_b_name).get_tensor().set(
gate_b, place
)
step_input_np = np.random.uniform(
-0.1, 0.1, (self.batch_size, self.hidden_size)
).astype('float32')
pre_hidden_np = np.random.uniform(
-0.1, 0.1, (self.batch_size, self.hidden_size)
).astype('float32')
pre_cell_np = np.random.uniform(
-0.1, 0.1, (self.batch_size, self.hidden_size)
).astype('float32')
out = exe.run(
feed={
'x': step_input_np,
'pre_hidden': pre_hidden_np,
'pre_cell': pre_cell_np,
},
fetch_list=[new_hidden, new_cell],
)
api_hidden_out = out[0]
api_cell_out = out[1]
np_hidden_out, np_cell_out = step(
step_input_np, pre_hidden_np, pre_cell_np, gate_w, gate_b
)
np.testing.assert_allclose(
api_hidden_out, np_hidden_out, rtol=0.0001, atol=0
)
np.testing.assert_allclose(
api_cell_out, np_cell_out, rtol=0.0001, atol=0
)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from test_imperative_base import new_program_scope
import paddle.fluid as fluid
import paddle.fluid.layers as layers
from paddle.fluid.contrib.layers import basic_gru, basic_lstm
class TestBasicGRUApiName(unittest.TestCase):
def setUp(self):
self.name_set = set(
[
"test1_fw_w_0_gate",
"test1_fw_w_0_candidate",
"test1_fw_b_0_gate",
"test1_fw_b_0_candidate",
"test1_bw_w_0_gate",
"test1_bw_w_0_candidate",
"test1_bw_b_0_gate",
"test1_bw_b_0_candidate",
]
)
def test_name(self):
batch_size = 20
input_size = 128
hidden_size = 256
num_layers = 1
dropout = 0.5
bidirectional = True
batch_first = False
with new_program_scope():
input = layers.data(
name="input",
shape=[-1, batch_size, input_size],
dtype='float32',
)
pre_hidden = layers.data(
name="pre_hidden", shape=[-1, hidden_size], dtype='float32'
)
sequence_length = layers.data(
name="sequence_length", shape=[-1], dtype='int32'
)
rnn_out, last_hidden = basic_gru(
input,
pre_hidden,
hidden_size,
num_layers=num_layers,
sequence_length=sequence_length,
dropout_prob=dropout,
bidirectional=bidirectional,
batch_first=batch_first,
param_attr=fluid.ParamAttr(name="test1"),
bias_attr=fluid.ParamAttr(name="test1"),
name="basic_gru",
)
var_list = fluid.io.get_program_parameter(
fluid.default_main_program()
)
for var in var_list:
self.assertTrue(var.name in self.name_set)
class TestBasicLSTMApiName(unittest.TestCase):
def setUp(self):
self.name_set = set(
[
"test1_fw_w_0",
"test1_fw_b_0",
"test1_fw_w_1",
"test1_fw_b_1",
"test1_bw_w_0",
"test1_bw_b_0",
"test1_bw_w_1",
"test1_bw_b_1",
]
)
def test_name(self):
batch_size = 20
input_size = 128
hidden_size = 256
num_layers = 2
dropout = 0.5
bidirectional = True
batch_first = False
with new_program_scope():
input = layers.data(
name="input",
shape=[-1, batch_size, input_size],
dtype='float32',
)
pre_hidden = layers.data(
name="pre_hidden", shape=[-1, hidden_size], dtype='float32'
)
pre_cell = layers.data(
name="pre_cell", shape=[-1, hidden_size], dtype='float32'
)
sequence_length = layers.data(
name="sequence_length", shape=[-1], dtype='int32'
)
rnn_out, last_hidden, last_cell = basic_lstm(
input,
pre_hidden,
pre_cell,
hidden_size,
num_layers=num_layers,
sequence_length=sequence_length,
dropout_prob=dropout,
bidirectional=bidirectional,
param_attr=fluid.ParamAttr(name="test1"),
bias_attr=fluid.ParamAttr(name="test1"),
batch_first=batch_first,
)
var_list = fluid.io.get_program_parameter(
fluid.default_main_program()
)
for var in var_list:
self.assertTrue(var.name in self.name_set)
if __name__ == '__main__':
unittest.main()
......@@ -20,8 +20,6 @@ import numpy as np
import paddle
import paddle.fluid as fluid
import paddle.fluid.layers as layers
from paddle.fluid import ParamAttr
from paddle.fluid.contrib.layers import basic_lstm
from paddle.fluid.executor import Executor
from paddle.fluid.layers.control_flow import StaticRNN as PaddingRNN
......@@ -85,7 +83,7 @@ class RNNConfig:
else:
raise ValueError('Unsupported model_type.')
if rnn_model not in ('static', 'padding', 'cudnn', 'basic_lstm'):
if rnn_model not in ('static', 'padding', 'cudnn'):
raise ValueError('Unsupported rnn_model.')
self.batch_size = 12
......@@ -406,23 +404,6 @@ def lm_model(
init_hidden=init_hidden_reshape,
init_cell=init_cell_reshape,
)
elif rnn_model == "basic_lstm":
rnn_out, last_hidden, last_cell = basic_lstm(
x_emb,
init_hidden,
init_cell,
hidden_size,
num_layers=num_layers,
batch_first=True,
dropout_prob=dropout,
param_attr=ParamAttr(
initializer=fluid.initializer.UniformInitializer(
low=-init_scale, high=init_scale
)
),
bias_attr=ParamAttr(initializer=fluid.initializer.Constant(0.0)),
forget_bias=0.0,
)
else:
print("type not support")
return
......
......@@ -2329,33 +2329,6 @@ class TestBook(LayerTest):
)
return output
def test_basic_gru(self):
input_size = 128
hidden_size = 256
with self.static_graph():
input = fluid.data(
name="input", shape=[None, None, input_size], dtype='float32'
)
pre_hidden = fluid.data(
name="pre_hidden", shape=[None, hidden_size], dtype='float32'
)
sequence_length = fluid.data(
name="sequence_length", shape=[None], dtype='int32'
)
for bidirectional in [True, False]:
for batch_first in [True, False]:
rnn_out, last_hidden = fluid.contrib.layers.basic_gru(
input,
pre_hidden,
hidden_size=256,
num_layers=2,
sequence_length=sequence_length,
dropout_prob=0.5,
bidirectional=bidirectional,
batch_first=batch_first,
)
class ExampleNet(paddle.nn.Layer):
def __init__(self):
......
......@@ -274,7 +274,6 @@ HIGH_PARALLEL_JOB_NEW = [
'test_mkldnn_op_nhwc',
'test_fc_act_mkldnn_fuse_pass',
'test_fleet_base_3',
'test_basic_rnn_name',
'test_query_op',
'test_fleet_base_4',
'save_load_op_test',
......@@ -1980,7 +1979,6 @@ CPU_PARALLEL_JOB = [
'test_beam_search_op',
'test_batch_sampler',
'test_batch_norm_act_fuse_pass',
'test_basic_rnn_name',
'test_attention_lstm_op',
'test_analyzer',
'test_aligned_allocator',
......
......@@ -71,7 +71,6 @@ STATIC_MODE_TESTING_LIST = [
'test_auc_single_pred_op',
'test_avoid_twice_initialization',
'test_backward',
'test_basic_rnn_name',
'test_batch_norm_op',
'test_batch_norm_op_v2',
'test_bce_loss',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册