det_pse_loss.py 4.6 KB
Newer Older
W
WenmuZhou 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
# -*- coding: utf-8 -*-
# @Time    : 3/29/19 11:03 AM
# @Author  : zhoujun
import paddle
from paddle import nn
from paddle.nn import functional as F
import numpy as np
from ppocr.utils.iou import iou


class PSELoss(nn.Layer):
    def __init__(self, alpha, ohem_ratio=3, kernel_sample_mask='pred', reduction='sum', **kwargs):
        """Implement PSE Loss.
        """
        super(PSELoss, self).__init__()
        assert reduction in ['sum', 'mean', 'none']
        self.alpha = alpha
        self.ohem_ratio = ohem_ratio
        self.kernel_sample_mask = kernel_sample_mask
        self.reduction = reduction

    def forward(self, outputs, labels):
        predicts = outputs['maps']
        predicts = F.interpolate(predicts, scale_factor=4)

        texts = predicts[:, 0, :, :]
        kernels = predicts[:, 1:, :, :]
        gt_texts, gt_kernels, training_masks = labels[1:]

        # text loss
        selected_masks = self.ohem_batch(texts, gt_texts, training_masks)

        loss_text = self.dice_loss(texts, gt_texts, selected_masks)
        iou_text = iou((texts > 0).astype('int64'), gt_texts, training_masks, reduce=False)
        losses = dict(
            loss_text=loss_text,
            iou_text=iou_text
        )

        # kernel loss
        loss_kernels = []
        if self.kernel_sample_mask == 'gt':
            selected_masks = gt_texts * training_masks
        elif self.kernel_sample_mask == 'pred':
            selected_masks = (F.sigmoid(texts) > 0.5).astype('float32') * training_masks

        for i in range(kernels.shape[1]):
            kernel_i = kernels[:, i, :, :]
            gt_kernel_i = gt_kernels[:, i, :, :]
            loss_kernel_i = self.dice_loss(kernel_i, gt_kernel_i, selected_masks)
            loss_kernels.append(loss_kernel_i)
        loss_kernels = paddle.mean(paddle.stack(loss_kernels, axis=1), axis=1)
        iou_kernel = iou(
            (kernels[:, -1, :, :] > 0).astype('int64'), gt_kernels[:, -1, :, :], training_masks * gt_texts,
            reduce=False)
        losses.update(dict(
            loss_kernels=loss_kernels,
            iou_kernel=iou_kernel
        ))
        loss = self.alpha * loss_text + (1 - self.alpha) * loss_kernels
        losses['loss'] = loss
        if self.reduction == 'sum':
            losses = {x: paddle.sum(v) for x, v in losses.items()}
        elif self.reduction == 'mean':
            losses = {x: paddle.mean(v) for x, v in losses.items()}
        return losses

    def dice_loss(self, input, target, mask):
        input = F.sigmoid(input)

        input = input.reshape([input.shape[0], -1])
        target = target.reshape([target.shape[0], -1])
        mask = mask.reshape([mask.shape[0], -1])

        input = input * mask
        target = target * mask

        a = paddle.sum(input * target, 1)
        b = paddle.sum(input * input, 1) + 0.001
        c = paddle.sum(target * target, 1) + 0.001
        d = (2 * a) / (b + c)
        return 1 - d

    def ohem_single(self, score, gt_text, training_mask, ohem_ratio=3):
        pos_num = int(paddle.sum((gt_text > 0.5).astype('float32'))) - int(
            paddle.sum(paddle.logical_and((gt_text > 0.5), (training_mask <= 0.5)).astype('float32')))

        if pos_num == 0:
            # selected_mask = gt_text.copy() * 0 # may be not good
            selected_mask = training_mask
            selected_mask = selected_mask.reshape([1, selected_mask.shape[0], selected_mask.shape[1]]).astype(
                'float32')
            return selected_mask

        neg_num = int(paddle.sum((gt_text <= 0.5).astype('float32')))
        neg_num = int(min(pos_num * ohem_ratio, neg_num))

        if neg_num == 0:
            selected_mask = training_mask
            selected_mask = selected_mask.view(1, selected_mask.shape[0], selected_mask.shape[1]).astype('float32')
            return selected_mask

        neg_score = paddle.masked_select(score, gt_text <= 0.5)
        neg_score_sorted = paddle.sort(-neg_score)
        threshold = -neg_score_sorted[neg_num - 1]

        selected_mask = paddle.logical_and(paddle.logical_or((score >= threshold), (gt_text > 0.5)),
                                           (training_mask > 0.5))
        selected_mask = selected_mask.reshape([1, selected_mask.shape[0], selected_mask.shape[1]]).astype('float32')
        return selected_mask

    def ohem_batch(self, scores, gt_texts, training_masks, ohem_ratio=3):
        selected_masks = []
        for i in range(scores.shape[0]):
            selected_masks.append(
                self.ohem_single(scores[i, :, :], gt_texts[i, :, :], training_masks[i, :, :], ohem_ratio))

        selected_masks = paddle.concat(selected_masks, 0).astype('float32')
        return selected_masks