kie_sdmgr_loss.py 4.6 KB
Newer Older
L
add kie  
LDOUBLEV 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
# copyright (c) 2021 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from paddle import nn
import paddle


class SDMGRLoss(nn.Layer):
    def __init__(self, node_weight=1.0, edge_weight=1.0, ignore=0):
        super().__init__()
        self.loss_node = nn.CrossEntropyLoss(ignore_index=ignore)
        self.loss_edge = nn.CrossEntropyLoss(ignore_index=-1)
        self.node_weight = node_weight
        self.edge_weight = edge_weight
        self.ignore = ignore

    def pre_process(self, gts, tag):
        gts, tag = gts.numpy(), tag.numpy().tolist()
        temp_gts = []
        batch = len(tag)
        for i in range(batch):
            num, recoder_len = tag[i][0], tag[i][1]
            temp_gts.append(
                paddle.to_tensor(
                    gts[i, :num, :num + 1], dtype='int64'))
        return temp_gts

    def accuracy(self, pred, target, topk=1, thresh=None):
        """Calculate accuracy according to the prediction and target.

        Args:
            pred (torch.Tensor): The model prediction, shape (N, num_class)
            target (torch.Tensor): The target of each prediction, shape (N, )
            topk (int | tuple[int], optional): If the predictions in ``topk``
                matches the target, the predictions will be regarded as
                correct ones. Defaults to 1.
            thresh (float, optional): If not None, predictions with scores under
                this threshold are considered incorrect. Default to None.

        Returns:
            float | tuple[float]: If the input ``topk`` is a single integer,
                the function will return a single float as accuracy. If
                ``topk`` is a tuple containing multiple integers, the
                function will return a tuple containing accuracies of
                each ``topk`` number.
        """
        assert isinstance(topk, (int, tuple))
        if isinstance(topk, int):
            topk = (topk, )
            return_single = True
        else:
            return_single = False

        maxk = max(topk)
        if pred.shape[0] == 0:
            accu = [pred.new_tensor(0.) for i in range(len(topk))]
            return accu[0] if return_single else accu
        pred_value, pred_label = paddle.topk(pred, maxk, axis=1)
        pred_label = pred_label.transpose(
            [1, 0])  # transpose to shape (maxk, N)
        correct = paddle.equal(pred_label,
                               (target.reshape([1, -1]).expand_as(pred_label)))
        res = []
        for k in topk:
            correct_k = paddle.sum(correct[:k].reshape([-1]).astype('float32'),
                                   axis=0,
                                   keepdim=True)
            res.append(
                paddle.multiply(correct_k,
                                paddle.to_tensor(100.0 / pred.shape[0])))
        return res[0] if return_single else res

    def forward(self, pred, batch):
        node_preds, edge_preds = pred
        gts, tag = batch[4], batch[5]
        gts = self.pre_process(gts, tag)
        node_gts, edge_gts = [], []
        for gt in gts:
            node_gts.append(gt[:, 0])
            edge_gts.append(gt[:, 1:].reshape([-1]))
        node_gts = paddle.concat(node_gts)
        edge_gts = paddle.concat(edge_gts)

        node_valids = paddle.nonzero(node_gts != self.ignore).reshape([-1])
        edge_valids = paddle.nonzero(edge_gts != -1).reshape([-1])
        loss_node = self.loss_node(node_preds, node_gts)
        loss_edge = self.loss_edge(edge_preds, edge_gts)
        loss = self.node_weight * loss_node + self.edge_weight * loss_edge
        return dict(
            loss=loss,
            loss_node=loss_node,
            loss_edge=loss_edge,
            acc_node=self.accuracy(
                paddle.gather(node_preds, node_valids),
                paddle.gather(node_gts, node_valids)),
            acc_edge=self.accuracy(
                paddle.gather(edge_preds, edge_valids),
                paddle.gather(edge_gts, edge_valids)))