# Losses

## Initial imports

In [3]:
import numpy as np
import pandas as pd
import torch


# increase displayed columns in jupyter notebook
pd.set_option("display.max_columns", 200)
pd.set_option("display.max_rows", 300)

In [52]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from pytorch_widedeep.wdtypes import *

class ZILNLoss(nn.Module):
    r"""Implementation of the `Zero Inflated LogNormal loss
    <https://arxiv.org/pdf/1912.07753.pdf>`
    """

    def __init__(self):
        super().__init__()

    def forward(self, input: Tensor, target: Tensor) -> Tensor:
        r"""
        Parameters
        ----------
        input: Tensor
            input tensor with predictions (not probabilities)
        target: Tensor
            target tensor with the actual classes

        Examples
        --------
        >>> import torch
        >>>
        >>> from pytorch_widedeep.losses import ZILNLoss
        >>>
        >>> # REGRESSION
        >>> target = torch.tensor([[0., 1.5]]).view(-1, 1)
        >>> input = torch.tensor([[.1, .2, .3], [.4, .5, .6]])
        >>> ZILNLoss()(input, target)
        tensor([0.6287, 1.9941])
        """
        positive = target>0
        positive = positive.float()

        assert input.shape == torch.Size([target.shape[0], 3]), "Wrong shape of input."
        positive_input = input[..., :1]

        classification_loss = F.binary_cross_entropy_with_logits(positive_input, positive)

        loc = input[..., 1:2]
        scale = torch.maximum(
            F.softplus(input[..., 2:]),
            torch.sqrt(torch.Tensor([torch.finfo(torch.float32).eps])))
        safe_labels = positive * target + (
            1 - positive) * torch.ones_like(target)

        regression_loss = -torch.mean(
            positive * torch.distributions.log_normal.LogNormal(loc=loc, scale=scale).log_prob(safe_labels),
            dim=-1)

        return classification_loss + regression_loss

In [53]:
target = torch.tensor([[0., 1.5]]).view(-1, 1)
input = torch.tensor([[.1, .2, .3], [.4, .5, .6]])
ZILNLoss()(input, target)

tensor([0.6287, 1.9941])

# Keras implementation - original

* https://github.com/google/lifetime_value/blob/master/lifetime_value/zero_inflated_lognormal.py

In [7]:
import tensorflow.compat.v1 as tf
import tensorflow_probability as tfp
tfd = tfp.distributions


def zero_inflated_lognormal_pred(logits: tf.Tensor) -> tf.Tensor:
  """Calculates predicted mean of zero inflated lognormal logits.
  Arguments:
    logits: [batch_size, 3] tensor of logits.
  Returns:
    preds: [batch_size, 1] tensor of predicted mean.
  """
  logits = tf.convert_to_tensor(logits, dtype=tf.float32)
  positive_probs = tf.keras.backend.sigmoid(logits[..., :1])
  loc = logits[..., 1:2]
  scale = tf.keras.backend.softplus(logits[..., 2:])
  preds = (
      positive_probs *
      tf.keras.backend.exp(loc + 0.5 * tf.keras.backend.square(scale)))
  return preds


def zero_inflated_lognormal_loss(labels: tf.Tensor,
                                 logits: tf.Tensor) -> tf.Tensor:
  """Computes the zero inflated lognormal loss.
  Usage with tf.keras API:
  ```python
  model = tf.keras.Model(inputs, outputs)
  model.compile('sgd', loss=zero_inflated_lognormal)
  ```
  Arguments:
    labels: True targets, tensor of shape [batch_size, 1].
    logits: Logits of output layer, tensor of shape [batch_size, 3].
  Returns:
    Zero inflated lognormal loss value.
  """
  labels = tf.convert_to_tensor(labels, dtype=tf.float32)
  positive = tf.cast(labels > 0, tf.float32)

  logits = tf.convert_to_tensor(logits, dtype=tf.float32)
  logits.shape.assert_is_compatible_with(
      tf.TensorShape(labels.shape[:-1].as_list() + [3]))

  positive_logits = logits[..., :1]
  classification_loss = tf.keras.losses.binary_crossentropy(
      y_true=positive, y_pred=positive_logits, from_logits=True)

  loc = logits[..., 1:2]
  scale = tf.math.maximum(
      tf.keras.backend.softplus(logits[..., 2:]),
      tf.math.sqrt(tf.keras.backend.epsilon()))
  safe_labels = positive * labels + (
      1 - positive) * tf.keras.backend.ones_like(labels)
  regression_loss = -tf.keras.backend.mean(
      positive * tfd.LogNormal(loc=loc, scale=scale).log_prob(safe_labels),
      axis=-1)

  return classification_loss + regression_loss

* https://github.com/google/lifetime_value/blob/master/lifetime_value/zero_inflated_lognormal_test.py

In [10]:
import numpy as np
from scipy import stats
import tensorflow.compat.v1 as tf


# Absolute error tolerance in asserting array near.
_ERR_TOL = 1e-6

# softplus function that calculates log(1+exp(x))
_softplus = lambda x: np.log(1.0 + np.exp(x))

# sigmoid function that calculates 1/(1+exp(-x))
_sigmoid = lambda x: 1 / (1 + np.exp(-x))


class ZeroInflatedLognormalLossTest():

  def setUp(self):
    super(ZeroInflatedLognormalLossTest, self).setUp()
    self.logits = np.array([[.1, .2, .3], [.4, .5, .6]])
    self.labels = np.array([[0.], [1.5]])

  def zero_inflated_lognormal(self, labels, logits):
    positive_logits = logits[..., :1]
    loss_zero = _softplus(positive_logits)
    loc = logits[..., 1:2]
    scale = np.maximum(
        _softplus(logits[..., 2:]),
        np.sqrt(tf.keras.backend.epsilon()))
    log_prob_non_zero = stats.lognorm.logpdf(
        x=labels, s=scale, loc=0, scale=np.exp(loc))
    loss_non_zero = _softplus(-positive_logits) - log_prob_non_zero
    return np.mean(np.where(labels == 0., loss_zero, loss_non_zero), axis=-1)

  def test_loss_value(self):
    expected_loss = self.zero_inflated_lognormal(self.labels, self.logits)
    loss = zero_inflated_lognormal.zero_inflated_lognormal_loss(
        self.labels, self.logits)
    self.assertArrayNear(self.evaluate(loss), expected_loss, _ERR_TOL)