tensorflow/models

View on GitHub
official/recommendation/uplift/metrics/poisson_metrics_test.py

Summary

Maintainability
C
1 day
Test Coverage
# Copyright 2024 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for poisson regression metrics."""

from absl.testing import parameterized
import tensorflow as tf, tf_keras

from official.recommendation.uplift import keras_test_case
from official.recommendation.uplift import types
from official.recommendation.uplift.metrics import poisson_metrics


def _get_two_tower_outputs(
    is_treatment: tf.Tensor,
    true_logits: tf.Tensor | None = None,
) -> types.TwoTowerTrainingOutputs:
  # Only the true_logits and is_treatment tensors are needed for testing.
  return types.TwoTowerTrainingOutputs(
      shared_embedding=tf.ones_like(is_treatment),
      control_predictions=tf.ones_like(is_treatment),
      treatment_predictions=tf.ones_like(is_treatment),
      uplift=tf.ones_like(is_treatment),
      control_logits=tf.ones_like(is_treatment),
      treatment_logits=tf.ones_like(is_treatment),
      true_logits=(
          true_logits if true_logits is not None else tf.ones_like(is_treatment)
      ),
      true_predictions=tf.ones_like(is_treatment),
      is_treatment=is_treatment,
  )


class LogLossTest(keras_test_case.KerasTestCase, parameterized.TestCase):

  @parameterized.named_parameters(
      {
          "testcase_name": "two_tower_outputs_not_sliced",
          "from_logits": True,
          "compute_full_loss": False,
          "slice_by_treatment": False,
          "y_true": tf.constant([[0], [0], [2], [7]], dtype=tf.float32),
          "y_pred": _get_two_tower_outputs(
              true_logits=tf.constant([[1], [2], [3], [4]], dtype=tf.float32),
              is_treatment=tf.constant([[1], [0], [1], [0]]),
          ),
          "expected_loss": tf.reduce_mean(
              tf.nn.log_poisson_loss(
                  tf.constant([[0], [0], [2], [7]], dtype=tf.float32),
                  tf.constant([[1], [2], [3], [4]], dtype=tf.float32),
              )
          ),
      },
      {
          "testcase_name": "two_tower_outputs_sliced",
          "from_logits": True,
          "compute_full_loss": False,
          "slice_by_treatment": True,
          "y_true": tf.constant([[1], [0]], dtype=tf.float32),
          "y_pred": _get_two_tower_outputs(
              true_logits=tf.constant([[1], [0]], dtype=tf.float32),
              is_treatment=tf.constant([[1], [0]]),
          ),
          "expected_loss": {
              "poisson_log_loss/treatment": (
                  tf.math.exp(1.0) - 1  # exp(1) - 1 * 1
              ),
              "poisson_log_loss/control": 1.0,  # exp(0) - 0 * 0
              "poisson_log_loss": ((tf.math.exp(1.0) - 1) + 1) / 2,
          },
      },
      {
          "testcase_name": "tensor_outputs_from_logits",
          "from_logits": True,
          "compute_full_loss": False,
          "slice_by_treatment": False,
          "y_true": tf.constant([[0], [0], [2], [7]], dtype=tf.float32),
          "y_pred": tf.constant([[1], [2], [3], [4]], dtype=tf.float32),
          "expected_loss": tf.reduce_mean(
              tf.nn.log_poisson_loss(
                  tf.constant([[0], [0], [2], [7]], dtype=tf.float32),
                  tf.constant([[1], [2], [3], [4]], dtype=tf.float32),
              )
          ),
      },
      {
          "testcase_name": "tensor_outputs_from_logits_full_loss",
          "from_logits": True,
          "compute_full_loss": True,
          "slice_by_treatment": False,
          "y_true": tf.constant([[0], [0], [2], [7]], dtype=tf.float32),
          "y_pred": tf.constant([[1], [2], [3], [4]], dtype=tf.float32),
          "expected_loss": tf.reduce_mean(
              tf.nn.log_poisson_loss(
                  tf.constant([[0], [0], [2], [7]], dtype=tf.float32),
                  tf.constant([[1], [2], [3], [4]], dtype=tf.float32),
                  compute_full_loss=True,
              )
          ),
      },
      {
          "testcase_name": "tensor_outputs_from_predictions",
          "from_logits": False,
          "compute_full_loss": False,
          "slice_by_treatment": False,
          "y_true": tf.constant([[0], [0], [2], [7]], dtype=tf.float32),
          "y_pred": tf.constant([[1], [2], [3], [4]], dtype=tf.float32),
          "expected_loss": tf.reduce_mean(
              tf.nn.log_poisson_loss(
                  tf.constant([[0], [0], [2], [7]], dtype=tf.float32),
                  tf.math.log(
                      tf.constant([[1], [2], [3], [4]], dtype=tf.float32)
                  ),
              )
          ),
      },
      {
          "testcase_name": "tensor_outputs_from_predictions_full_loss",
          "from_logits": False,
          "compute_full_loss": True,
          "slice_by_treatment": False,
          "y_true": tf.constant([[0], [0], [2], [7]], dtype=tf.float32),
          "y_pred": tf.constant([[1], [2], [3], [4]], dtype=tf.float32),
          "expected_loss": tf.reduce_mean(
              tf.nn.log_poisson_loss(
                  tf.constant([[0], [0], [2], [7]], dtype=tf.float32),
                  tf.math.log(
                      tf.constant([[1], [2], [3], [4]], dtype=tf.float32)
                  ),
                  compute_full_loss=True,
              )
          ),
      },
  )
  def test_metric_computes_correct_loss(
      self,
      from_logits: bool,
      compute_full_loss: bool,
      slice_by_treatment: bool,
      y_true: tf.Tensor,
      y_pred: tf.Tensor,
      expected_loss: tf.Tensor,
  ):
    metric = poisson_metrics.LogLoss(
        from_logits=from_logits,
        compute_full_loss=compute_full_loss,
        slice_by_treatment=slice_by_treatment,
    )
    metric.update_state(y_true, y_pred)
    self.assertAllClose(expected_loss, metric.result())

  @parameterized.product(
      from_logits=(True, False),
      compute_full_loss=(True, False),
  )
  def test_metric_is_configurable(
      self, from_logits: bool, compute_full_loss: bool
  ):
    metric = poisson_metrics.LogLoss(
        from_logits=from_logits,
        compute_full_loss=compute_full_loss,
        slice_by_treatment=False,
    )
    self.assertLayerConfigurable(
        layer=metric,
        y_true=tf.constant([[0], [0], [2], [7]], dtype=tf.float32),
        y_pred=tf.constant([[1], [2], [3], [4]], dtype=tf.float32),
        serializable=True,
    )


class LogLossMeanBaselineTest(
    keras_test_case.KerasTestCase, parameterized.TestCase
):

  @parameterized.named_parameters(
      {
          "testcase_name": "label_zero",
          "expected_loss": 0.0,
          "y_true": tf.constant([0], dtype=tf.float32),
      },
      {
          "testcase_name": "small_positive_label",
          "expected_loss": 0.0,
          "y_true": tf.constant([1e-10], dtype=tf.float32),
      },
      {
          "testcase_name": "label_one",
          "expected_loss": 1.0,
          "y_true": tf.constant([1], dtype=tf.float32),
      },
      {
          "testcase_name": "weighted_loss",
          "expected_loss": 1.0,
          "y_true": tf.constant([[0], [1]], dtype=tf.float32),
          "sample_weight": tf.constant([[0], [1]], dtype=tf.float32),
      },
      {
          "testcase_name": "two_tower_outputs",
          "expected_loss": 0.5 - 0.5 * tf.math.log(0.5),
          "y_true": tf.constant([[0], [1]], dtype=tf.float32),
          "y_pred": _get_two_tower_outputs(
              is_treatment=tf.constant([[0], [1]], dtype=tf.float32),
          ),
      },
      {
          "testcase_name": "two_tower_outputs_sliced_loss",
          "expected_loss": {
              "loss": 0.5 - 0.5 * tf.math.log(0.5),
              "loss/control": 0.0,
              "loss/treatment": 1.0,
          },
          "y_true": tf.constant([[0], [1]], dtype=tf.float32),
          "y_pred": _get_two_tower_outputs(
              is_treatment=tf.constant([[0], [1]], dtype=tf.float32),
          ),
          "slice_by_treatment": True,
      },
  )
  def test_metric_computes_correct_loss(
      self,
      expected_loss: tf.Tensor,
      y_true: tf.Tensor,
      y_pred: types.TwoTowerTrainingOutputs | tf.Tensor | None = None,
      sample_weight: tf.Tensor | None = None,
      slice_by_treatment: bool = False,
  ):
    metric = poisson_metrics.LogLossMeanBaseline(
        slice_by_treatment=slice_by_treatment, name="loss"
    )
    metric.update_state(y_true, y_pred, sample_weight=sample_weight)
    self.assertAllClose(expected_loss, metric.result())

  def test_negative_label_returns_nan_loss(self):
    metric = poisson_metrics.LogLossMeanBaseline(slice_by_treatment=False)
    metric.update_state(tf.constant([-1.0]))
    self.assertTrue(tf.math.is_nan(metric.result()).numpy().item())

  def test_metric_is_configurable(self):
    metric = poisson_metrics.LogLossMeanBaseline(slice_by_treatment=False)
    self.assertLayerConfigurable(
        layer=metric,
        y_true=tf.constant([[0], [0], [2], [7]], dtype=tf.float32),
        serializable=True,
    )


class LogLossMinimumTest(keras_test_case.KerasTestCase, parameterized.TestCase):

  @parameterized.named_parameters(
      {
          "testcase_name": "label_zero",
          "expected_loss": 0.0,
          "y_true": tf.constant([0], dtype=tf.float32),
      },
      {
          "testcase_name": "small_positive_label",
          "expected_loss": 0.0,
          "y_true": tf.constant([1e-10], dtype=tf.float32),
      },
      {
          "testcase_name": "label_one",
          "expected_loss": 1.0,
          "y_true": tf.constant([1], dtype=tf.float32),
      },
      {
          "testcase_name": "weighted_loss",
          "expected_loss": 1.0,
          "y_true": tf.constant([[0], [1]], dtype=tf.float32),
          "sample_weight": tf.constant([[0], [1]], dtype=tf.float32),
      },
      {
          "testcase_name": "two_tower_outputs",
          "expected_loss": 0.5,
          "y_true": tf.constant([[0], [1]], dtype=tf.float32),
          "y_pred": _get_two_tower_outputs(
              is_treatment=tf.constant([[0], [1]], dtype=tf.float32),
          ),
      },
      {
          "testcase_name": "two_tower_outputs_sliced_loss",
          "expected_loss": {
              "loss": 0.5,
              "loss/control": 0.0,
              "loss/treatment": 1.0,
          },
          "y_true": tf.constant([[0], [1]], dtype=tf.float32),
          "y_pred": _get_two_tower_outputs(
              is_treatment=tf.constant([[0], [1]], dtype=tf.float32),
          ),
          "slice_by_treatment": True,
      },
  )
  def test_metric_computes_correct_loss(
      self,
      expected_loss: tf.Tensor,
      y_true: tf.Tensor,
      y_pred: types.TwoTowerTrainingOutputs | tf.Tensor | None = None,
      sample_weight: tf.Tensor | None = None,
      slice_by_treatment: bool = False,
  ):
    metric = poisson_metrics.LogLossMinimum(
        slice_by_treatment=slice_by_treatment, name="loss"
    )
    metric.update_state(y_true, y_pred, sample_weight=sample_weight)
    self.assertAllClose(expected_loss, metric.result())

  def test_negative_label_returns_nan_loss(self):
    metric = poisson_metrics.LogLossMinimum(slice_by_treatment=False)
    metric.update_state(tf.constant([-1.0]))
    self.assertTrue(tf.math.is_nan(metric.result()).numpy().item())

  def test_metric_is_configurable(self):
    metric = poisson_metrics.LogLossMinimum(slice_by_treatment=False)
    self.assertLayerConfigurable(
        layer=metric,
        y_true=tf.constant([[0], [0], [2], [7]], dtype=tf.float32),
        serializable=True,
    )


class PseudoRSquaredTest(keras_test_case.KerasTestCase, parameterized.TestCase):

  @parameterized.named_parameters(
      {
          "testcase_name": "no_data",
          "expected_loss": 0.0,
          "y_true": tf.constant([], dtype=tf.float32),
          "y_pred": tf.constant([], dtype=tf.float32),
      },
      {
          "testcase_name": "one_correct_prediction",
          "expected_loss": 0.0,
          "y_true": tf.constant([1], dtype=tf.float32),
          "y_pred": tf.constant([1], dtype=tf.float32),
      },
      {
          "testcase_name": "one_wrong_prediction",
          "expected_loss": 0.0,  # LLmax and LLbaseline are equal.
          "y_true": tf.constant([0], dtype=tf.float32),
          "y_pred": tf.constant([1], dtype=tf.float32),
      },
      {
          "testcase_name": "all_correct_predictions",
          "expected_loss": 1.0,
          "y_true": tf.constant([[1], [2], [3]], dtype=tf.float32),
          "y_pred": tf.constant([[1], [2], [3]], dtype=tf.float32),
          "from_logits": False,
      },
      {
          "testcase_name": "almost_correct_predictions",
          "expected_loss": 1.0,
          "y_true": tf.constant([[1], [2], [3]], dtype=tf.float32),
          "y_pred": tf.constant([[1], [1.9999], [3.0001]], dtype=tf.float32),
      },
      {
          "testcase_name": "from_logits",
          "expected_loss": (
              (tf.math.exp(1.0) / 2) - (0.5 - 0.5 * tf.math.log(0.5))
          ) / (0.5 - (0.5 - 0.5 * tf.math.log(0.5))),
          "y_true": tf.constant([[0], [1]], dtype=tf.float32),
          "y_pred": tf.constant([[0], [1]], dtype=tf.float32),
          "from_logits": True,
      },
      {
          "testcase_name": "two_tower_outputs",
          "expected_loss": (
              ((tf.math.exp(1.0) - 1) + 1) / 2 - (0.5 - 0.5 * tf.math.log(0.5))
          ) / (0.5 - (0.5 - 0.5 * tf.math.log(0.5))),
          "y_true": tf.constant([[0], [1]], dtype=tf.float32),
          "y_pred": _get_two_tower_outputs(
              true_logits=tf.constant([[0], [1]], dtype=tf.float32),
              is_treatment=tf.constant([[0], [1]], dtype=tf.float32),
          ),
          "from_logits": True,
      },
      {
          "testcase_name": "two_tower_outputs_sliced_loss",
          "expected_loss": {
              "r2": (
                  ((tf.math.exp(1.0) - 1) + 1) / 2  # LLfit
                  - (0.5 - 0.5 * tf.math.log(0.5))  # LLbaseline
              ) / (0.5 - (0.5 - 0.5 * tf.math.log(0.5))),
              "r2/control": 0.0,
              "r2/treatment": 0.0,
          },
          "y_true": tf.constant([[0], [1]], dtype=tf.float32),
          "y_pred": _get_two_tower_outputs(
              true_logits=tf.constant([[0], [1]], dtype=tf.float32),
              is_treatment=tf.constant([[0], [1]], dtype=tf.float32),
          ),
          "from_logits": True,
          "slice_by_treatment": True,
      },
  )
  def test_metric_computation_is_correct(
      self,
      expected_loss: tf.Tensor,
      y_true: tf.Tensor,
      y_pred: types.TwoTowerTrainingOutputs | tf.Tensor,
      sample_weight: tf.Tensor | None = None,
      from_logits: bool = False,
      slice_by_treatment: bool = False,
  ):
    metric = poisson_metrics.PseudoRSquared(
        from_logits=from_logits,
        slice_by_treatment=slice_by_treatment,
        name="r2",
    )
    metric.update_state(y_true, y_pred, sample_weight=sample_weight)
    self.assertAllClose(expected_loss, metric.result())

  def test_slicing_raises_error_when_input_is_tensor(self):
    metric = poisson_metrics.PseudoRSquared()
    y_true = tf.constant([[0], [0], [2], [7]], dtype=tf.float32)
    y_pred = tf.constant([[1], [2], [3], [4]], dtype=tf.float32)
    with self.assertRaisesRegex(
        ValueError,
        "`slice_by_treatment` must be set to `False` when `y_pred` is not of"
        " type `TwoTowerTrainingOutputs`.",
    ):
      metric(y_true, y_pred)

  @parameterized.parameters(True, False)
  def test_metric_is_configurable(self, from_logits: bool):
    metric = poisson_metrics.PseudoRSquared(
        from_logits=from_logits, slice_by_treatment=False
    )
    self.assertLayerConfigurable(
        layer=metric,
        y_true=tf.constant([[0], [0], [2], [7]], dtype=tf.float32),
        y_pred=tf.constant([[1], [2], [3], [4]], dtype=tf.float32),
        serializable=True,
    )


if __name__ == "__main__":
  tf.test.main()