# copyright (c) 2021 PaddlePaddle Authors. All Rights Reserve. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import absolute_import from __future__ import division from __future__ import print_function import paddle import paddle.nn as nn import paddle.nn.functional as F import numpy as np class TableAttentionHead(nn.Layer): def __init__(self, in_channels, hidden_size, loc_type, in_max_len=488, **kwargs): super(TableAttentionHead, self).__init__() self.input_size = in_channels[-1] self.hidden_size = hidden_size self.elem_num = 30 self.max_text_length = 100 self.max_elem_length = 500 self.max_cell_num = 500 self.structure_attention_cell = AttentionGRUCell( self.input_size, hidden_size, self.elem_num, use_gru=False) self.structure_generator = nn.Linear(hidden_size, self.elem_num) self.loc_type = loc_type self.in_max_len = in_max_len if self.loc_type == 1: self.loc_generator = nn.Linear(hidden_size, 4) else: if self.in_max_len == 640: self.loc_fea_trans = nn.Linear(400, self.max_elem_length+1) elif self.in_max_len == 800: self.loc_fea_trans = nn.Linear(625, self.max_elem_length+1) else: self.loc_fea_trans = nn.Linear(256, self.max_elem_length+1) self.loc_generator = nn.Linear(self.input_size + hidden_size, 4) def _char_to_onehot(self, input_char, onehot_dim): input_ont_hot = F.one_hot(input_char, onehot_dim) return input_ont_hot def forward(self, inputs, targets=None): # if and else branch are both needed when you want to assign a variable # if you modify the var in just one branch, then the modification will not work. fea = inputs[-1] if len(fea.shape) == 3: pass else: last_shape = int(np.prod(fea.shape[2:])) # gry added fea = paddle.reshape(fea, [fea.shape[0], fea.shape[1], last_shape]) fea = fea.transpose([0, 2, 1]) # (NTC)(batch, width, channels) batch_size = fea.shape[0] hidden = paddle.zeros((batch_size, self.hidden_size)) output_hiddens = [] if self.training and targets is not None: structure = targets[0] for i in range(self.max_elem_length+1): elem_onehots = self._char_to_onehot( structure[:, i], onehot_dim=self.elem_num) (outputs, hidden), alpha = self.structure_attention_cell( hidden, fea, elem_onehots) output_hiddens.append(paddle.unsqueeze(outputs, axis=1)) output = paddle.concat(output_hiddens, axis=1) structure_probs = self.structure_generator(output) if self.loc_type == 1: loc_preds = self.loc_generator(output) loc_preds = F.sigmoid(loc_preds) else: loc_fea = fea.transpose([0, 2, 1]) loc_fea = self.loc_fea_trans(loc_fea) loc_fea = loc_fea.transpose([0, 2, 1]) loc_concat = paddle.concat([output, loc_fea], axis=2) loc_preds = self.loc_generator(loc_concat) loc_preds = F.sigmoid(loc_preds) else: temp_elem = paddle.zeros(shape=[batch_size], dtype="int32") structure_probs = None loc_preds = None elem_onehots = None outputs = None alpha = None max_elem_length = paddle.to_tensor(self.max_elem_length) i = 0 while i < max_elem_length+1: elem_onehots = self._char_to_onehot( temp_elem, onehot_dim=self.elem_num) (outputs, hidden), alpha = self.structure_attention_cell( hidden, fea, elem_onehots) output_hiddens.append(paddle.unsqueeze(outputs, axis=1)) structure_probs_step = self.structure_generator(outputs) temp_elem = structure_probs_step.argmax(axis=1, dtype="int32") i += 1 output = paddle.concat(output_hiddens, axis=1) structure_probs = self.structure_generator(output) structure_probs = F.softmax(structure_probs) if self.loc_type == 1: loc_preds = self.loc_generator(output) loc_preds = F.sigmoid(loc_preds) else: loc_fea = fea.transpose([0, 2, 1]) loc_fea = self.loc_fea_trans(loc_fea) loc_fea = loc_fea.transpose([0, 2, 1]) loc_concat = paddle.concat([output, loc_fea], axis=2) loc_preds = self.loc_generator(loc_concat) loc_preds = F.sigmoid(loc_preds) return {'structure_probs':structure_probs, 'loc_preds':loc_preds} class AttentionGRUCell(nn.Layer): def __init__(self, input_size, hidden_size, num_embeddings, use_gru=False): super(AttentionGRUCell, self).__init__() self.i2h = nn.Linear(input_size, hidden_size, bias_attr=False) self.h2h = nn.Linear(hidden_size, hidden_size) self.score = nn.Linear(hidden_size, 1, bias_attr=False) self.rnn = nn.GRUCell( input_size=input_size + num_embeddings, hidden_size=hidden_size) self.hidden_size = hidden_size def forward(self, prev_hidden, batch_H, char_onehots): batch_H_proj = self.i2h(batch_H) prev_hidden_proj = paddle.unsqueeze(self.h2h(prev_hidden), axis=1) res = paddle.add(batch_H_proj, prev_hidden_proj) res = paddle.tanh(res) e = self.score(res) alpha = F.softmax(e, axis=1) alpha = paddle.transpose(alpha, [0, 2, 1]) context = paddle.squeeze(paddle.mm(alpha, batch_H), axis=1) concat_context = paddle.concat([context, char_onehots], 1) cur_hidden = self.rnn(concat_context, prev_hidden) return cur_hidden, alpha class AttentionLSTM(nn.Layer): def __init__(self, in_channels, out_channels, hidden_size, **kwargs): super(AttentionLSTM, self).__init__() self.input_size = in_channels self.hidden_size = hidden_size self.num_classes = out_channels self.attention_cell = AttentionLSTMCell( in_channels, hidden_size, out_channels, use_gru=False) self.generator = nn.Linear(hidden_size, out_channels) def _char_to_onehot(self, input_char, onehot_dim): input_ont_hot = F.one_hot(input_char, onehot_dim) return input_ont_hot def forward(self, inputs, targets=None, batch_max_length=25): batch_size = inputs.shape[0] num_steps = batch_max_length hidden = (paddle.zeros((batch_size, self.hidden_size)), paddle.zeros( (batch_size, self.hidden_size))) output_hiddens = [] if targets is not None: for i in range(num_steps): # one-hot vectors for a i-th char char_onehots = self._char_to_onehot( targets[:, i], onehot_dim=self.num_classes) hidden, alpha = self.attention_cell(hidden, inputs, char_onehots) hidden = (hidden[1][0], hidden[1][1]) output_hiddens.append(paddle.unsqueeze(hidden[0], axis=1)) output = paddle.concat(output_hiddens, axis=1) probs = self.generator(output) else: targets = paddle.zeros(shape=[batch_size], dtype="int32") probs = None for i in range(num_steps): char_onehots = self._char_to_onehot( targets, onehot_dim=self.num_classes) hidden, alpha = self.attention_cell(hidden, inputs, char_onehots) probs_step = self.generator(hidden[0]) hidden = (hidden[1][0], hidden[1][1]) if probs is None: probs = paddle.unsqueeze(probs_step, axis=1) else: probs = paddle.concat( [probs, paddle.unsqueeze( probs_step, axis=1)], axis=1) next_input = probs_step.argmax(axis=1) targets = next_input return probs class AttentionLSTMCell(nn.Layer): def __init__(self, input_size, hidden_size, num_embeddings, use_gru=False): super(AttentionLSTMCell, self).__init__() self.i2h = nn.Linear(input_size, hidden_size, bias_attr=False) self.h2h = nn.Linear(hidden_size, hidden_size) self.score = nn.Linear(hidden_size, 1, bias_attr=False) if not use_gru: self.rnn = nn.LSTMCell( input_size=input_size + num_embeddings, hidden_size=hidden_size) else: self.rnn = nn.GRUCell( input_size=input_size + num_embeddings, hidden_size=hidden_size) self.hidden_size = hidden_size def forward(self, prev_hidden, batch_H, char_onehots): batch_H_proj = self.i2h(batch_H) prev_hidden_proj = paddle.unsqueeze(self.h2h(prev_hidden[0]), axis=1) res = paddle.add(batch_H_proj, prev_hidden_proj) res = paddle.tanh(res) e = self.score(res) alpha = F.softmax(e, axis=1) alpha = paddle.transpose(alpha, [0, 2, 1]) context = paddle.squeeze(paddle.mm(alpha, batch_H), axis=1) concat_context = paddle.concat([context, char_onehots], 1) cur_hidden = self.rnn(concat_context, prev_hidden) return cur_hidden, alpha