
View on GitHub


6 hrs
Test Coverage
# This file is part of LensKit.
# Copyright (C) 2018-2023 Boise State University
# Copyright (C) 2023-2024 Drexel University
# Licensed under the MIT license, see LICENSE.md for details.
# SPDX-License-Identifier: MIT

Top-N evaluation metrics.

import logging
import warnings

import numpy as np
import pandas as pd

_log = logging.getLogger(__name__)

def bulk_impl(metric):
    Decorator to register a bulk implementation for a metric.

    def wrap(impl):
        metric.bulk_score = impl
        return impl

    return wrap

def precision(recs, truth, k=None):
    Compute recommendation precision.  This is computed as:

    .. math::
        \\frac{|L \\cap I_u^{\\mathrm{test}}|}{|L|}

    In the uncommon case that ``k`` is specified and ``len(recs) < k``, this metric uses
    ``len(recs)`` as the denominator.

    This metric has a bulk implementation.
    if k is not None:
        recs = recs.iloc[:k]

    nrecs = len(recs)
    if nrecs == 0:
        return None

    ngood = recs["item"].isin(truth.index).sum()
    return ngood / nrecs

def _bulk_precision(recs, truth, k=None):
    if k is not None:
        recs = recs[recs["rank"] <= k]
        lcounts = pd.Series(k, index=recs["LKRecID"].unique())
        lcounts.index.name = "LKRecID"
        lcounts = recs.groupby(["LKRecID"])["item"].count()

    good = recs.join(truth, on=["LKTruthID", "item"], how="inner")
    gcounts = good.groupby(["LKRecID"])["item"].count()

    lcounts, gcounts = lcounts.align(gcounts, join="left", fill_value=0)

    return gcounts / lcounts

def recall(recs, truth, k=None):
    Compute recommendation recall.  This is computed as:

    .. math::
        \\frac{|L \\cap I_u^{\\mathrm{test}}|}{\\operatorname{min}\\{|I_u^{\\mathrm{test}}|, k\\}}

    This metric has a bulk implementation.
    nrel = len(truth)
    if nrel == 0:
        return None

    if k is not None:
        nrel = min(nrel, k)
        recs = recs.iloc[:k]

    ngood = recs["item"].isin(truth.index).sum()
    return ngood / nrel

def _bulk_recall(recs, truth, k=None):
    tcounts = truth.reset_index().groupby("LKTruthID")["item"].count()

    if k is not None:
        _log.debug("truncating to k for recall")
        tcounts = np.minimum(tcounts, k)
        recs = recs[recs["rank"] <= k]

    good = recs.join(truth, on=["LKTruthID", "item"], how="inner")
    gcounts = good.groupby("LKRecID")["item"].count()

    # we need all lists, because some might have no truth (oops), some no recs (also oops)
    lists = recs[["LKRecID", "LKTruthID"]].drop_duplicates()

    scores = lists.join(gcounts.to_frame("ngood"), on="LKRecID", how="left")
    scores["ngood"].fillna(0, inplace=True)
    scores = scores.join(tcounts.to_frame("nrel"), on="LKTruthID", how="left")
    scores = scores.set_index("LKRecID")
    return scores["ngood"] / scores["nrel"]

def hit(recs, truth, k=None):
    Compute whether or not a list is a hit; any list with at least one relevant item in the
    first :math:`k` positions (:math:`L_{\\le k} \\cap I_u^{\\mathrm{test}} \\ne \\emptyset`)
    is scored as 1, and lists with no relevant items as 0.  When averaged over the recommendation
    lists, this computes the *hit rate* :cite:p:`Deshpande2004-ht`.

    This metric has a bulk implementation.

    if len(truth) == 0:
        return None

    if k is not None:
        recs = recs.iloc[:k]

    good = recs["item"].isin(truth.index)
    if np.any(good):
        return 1
        return 0

def _bulk_hit(recs, truth, k=None):
    tcounts = truth.reset_index().groupby("LKTruthID")["item"].count()

    if k is not None:
        _log.debug("truncating to k for recall")
        tcounts = np.minimum(tcounts, k)
        recs = recs[recs["rank"] <= k]

    good = recs.join(truth, on=["LKTruthID", "item"], how="inner")
    gcounts = good.groupby("LKRecID")["item"].count()

    # we need all lists, because some might have no truth (oops), some no recs (also oops)
    lists = recs[["LKRecID", "LKTruthID"]].drop_duplicates()

    scores = lists.join(gcounts.to_frame("ngood"), on="LKRecID", how="left")
    scores["ngood"].fillna(0, inplace=True)

    scores = scores.join(tcounts.to_frame("nrel"), on="LKTruthID", how="left")
    scores = scores.set_index("LKRecID")

    good = scores["ngood"] > 0
    good = good.astype("f4")
    good[scores["nrel"] == 0] = np.nan
    return good

def recip_rank(recs, truth, k=None):
    Compute the reciprocal rank :cite:p:`Kantor1997-lm` of the first relevant
    item in a list of recommendations. Let :math:`\\kappa` denote the 1-based
    rank of the first relevant item in :math:`L`, with :math:`\\kappa=\\infty`
    if none of the first :math:`k` items in :math:`L` are relevant; then the
    reciprocal rank is :math:`1 / \\kappa`. If no elements are relevant, the
    reciprocal rank is therefore 0.  :cite:t:`Deshpande2004-ht` call this the
    “reciprocal hit rate”.

    This metric has a bulk equivalent.
    if k is not None:
        recs = recs.iloc[:k]

    good = recs["item"].isin(truth.index)
    (npz,) = np.nonzero(good.to_numpy())
    if len(npz):
        return 1.0 / (npz[0] + 1.0)
        return 0.0

def _bulk_rr(recs, truth, k=None):
    # find everything with truth
    if k is not None:
        recs = recs[recs["rank"] <= k]
    joined = recs.join(truth, on=["LKTruthID", "item"], how="inner")
    # compute min ranks
    ranks = joined.groupby("LKRecID")["rank"].agg("min")
    # reciprocal ranks
    scores = 1.0 / ranks
    _log.debug("have %d scores with MRR %.3f", len(scores), scores.mean())
    # fill with zeros
    rec_ids = recs["LKRecID"].unique()
    scores = scores.reindex(rec_ids, fill_value=0.0)
    _log.debug("filled to get %s scores w/ MRR %.3f", len(scores), scores.mean())
    # and we're done
    return scores

def _dcg(scores, discount=np.log2):
    Compute the Discounted Cumulative Gain of a series of recommended items with rating scores.
    These should be relevance scores; they can be :math:`{0,1}` for binary relevance data.

    This is not a true top-N metric, but is a utility function for other metrics.

            The utility scores of a list of recommendations, in recommendation order.
            the rank discount function.  Each item's score will be divided the discount of its rank,
            if the discount is greater than 1.

        double: the DCG of the scored items.
    scores = np.nan_to_num(scores)
    ranks = np.arange(1, len(scores) + 1)
    disc = discount(ranks)
    np.maximum(disc, 1, out=disc)
    np.reciprocal(disc, out=disc)
    return np.dot(scores, disc)

def _fixed_dcg(n, discount=np.log2):
    ranks = np.arange(1, n + 1)
    disc = discount(ranks)
    disc = np.maximum(disc, 1)
    disc = np.reciprocal(disc)
    return np.sum(disc)

def dcg(recs, truth, discount=np.log2):
    Compute the **unnormalized** discounted cumulative gain :cite:p:`Jarvelin2002-xf`.

    Discounted cumultative gain is computed as:

    .. math::
        \\mathrm{DCG}(L,u) & = \\sum_{i=1}^{|L|} \\frac{r_{ui}}{d(i)}

    Unrated items are assumed to have a utility of 0; if no rating values are provided in the
    truth frame, item ratings are assumed to be 1.

        recs: The recommendation list.
        truth: The user's test data.
            The rank discount function.  Each item's score will be divided the discount of its rank,
            if the discount is greater than 1.

    tpos = truth.index.get_indexer(recs["item"])
    tgood = tpos >= 0
    if "rating" in truth.columns:
        # make an array of ratings for this rec list
        r_rates = truth["rating"].values[tpos]
        r_rates[tpos < 0] = 0
        achieved = _dcg(r_rates, discount)
        achieved = _dcg(tgood, discount)

    return achieved

def ndcg(recs, truth, discount=np.log2, k=None):
    Compute the normalized discounted cumulative gain :cite:p:`ndcg`.

    Discounted cumultative gain is computed as:

    .. math::
        \\mathrm{DCG}(L,u) & = \\sum_{i=1}^{|L|} \\frac{r_{ui}}{d(i)}

    Unrated items are assumed to have a utility of 0; if no rating values are provided in the
    truth frame, item ratings are assumed to be 1.

    This is then normalized as follows:

    .. math::
        \\mathrm{nDCG}(L, u) & = \\frac{\\mathrm{DCG}(L,u)}{\\mathrm{DCG}(L_{\\mathrm{ideal}}, u)}

    This metric has a bulk implementation.

        recs: The recommendation list.
        truth: The user's test data.
            The rank discount function.  Each item's score will be divided the discount of its rank,
            if the discount is greater than 1.
            The maximum list length.

    if k is not None:
        recs = recs.iloc[:k]

    tpos = truth.index.get_indexer(recs["item"])

    if "rating" in truth.columns:
        i_rates = np.sort(truth.rating.values)[::-1]
        if k is not None:
            i_rates = i_rates[:k]
        ideal = _dcg(i_rates, discount)
        # make an array of ratings for this rec list
        r_rates = truth["rating"].values[tpos]
        r_rates[tpos < 0] = 0
        achieved = _dcg(r_rates, discount)
        ntrue = len(truth)
        if k is not None and ntrue > k:
            ntrue = k
        ideal = _fixed_dcg(ntrue, discount)
        tgood = tpos >= 0
        achieved = _dcg(tgood, discount)

    return achieved / ideal

def _bulk_ndcg(recs, truth, discount=np.log2, k=None):
    if "rating" not in truth.columns:
        truth = truth.assign(rating=np.ones(len(truth), dtype=np.float32))

    ideal = truth.groupby(level="LKTruthID")["rating"].rank(method="first", ascending=False)
    if k is not None:
        ideal = ideal[ideal <= k]
    ideal = discount(ideal)
    ideal = np.maximum(ideal, 1)
    ideal = truth["rating"] / ideal
    ideal = ideal.groupby(level="LKTruthID").sum()
    ideal.name = "ideal"

    list_ideal = recs[["LKRecID", "LKTruthID"]].drop_duplicates()
    list_ideal = list_ideal.join(ideal, on="LKTruthID", how="left")
    list_ideal = list_ideal.set_index("LKRecID")

    if k is not None:
        recs = recs[recs["rank"] <= k]
    rated = recs.join(truth, on=["LKTruthID", "item"], how="inner")
    rd = discount(rated["rank"])
    rd = np.maximum(rd, 1)
    rd = rated["rating"] / rd
    rd = rated[["LKRecID"]].assign(util=rd)
    dcg = rd.groupby(["LKRecID"])["util"].sum().reset_index(name="dcg")
    dcg = dcg.set_index("LKRecID")

    dcg = dcg.join(list_ideal, how="outer")
    dcg["ndcg"] = dcg["dcg"].fillna(0) / dcg["ideal"]

    return dcg["ndcg"]

def rbp(recs, truth, k=None, patience=0.5, normalize=False):
    Evaluate recommendations with rank-biased precision :cite:p:`rbp` with a
    patience parameter :math:`\\gamma`.

    If :math:`r_{ui} \\in \\{0, 1\\}` is binary implicit ratings, this is
    computed by:

    .. math::
        \\operatorname{RBP}_\\gamma(L, u) & =(1 - \\gamma) \\sum_i r_{ui} p^i

    The original RBP metric depends on the idea that the rank-biased sum of
    binary relevance scores in an infinitely-long, perfectly-precise list has is
    :math:`1/(1 - \\gamma)`. However, in recommender evaluation, we usually have
    a small test set, so the maximum achievable RBP is significantly less, and
    is a function of the number of test items.  With ``normalize=True``, the RBP
    metric will be normalized by the maximum achievable with the provided test

        recs: the recommendation list.
        truth: the user's truth data.
        k(int): the maximum recommendation list length.
        patience(float): the patience parameter :math:`\\gamma`, the probability
            that the user continues browsing at each point.
        normalize(bool): whether to normalize the RBP
            scores; if ``True``, divides the RBP score by the maximum achievable
            with the test data.
    if k is not None and k <= len(recs):
        recs = recs.iloc[:k]
        k = len(recs)

    if "rank" not in recs.columns:
        recs = recs.assign(rank=np.arange(1, len(recs) + 1))

    if np.min(recs["rank"]) != 1:
        warnings.warn("rank should start with 1")

    nrel = len(truth)
    if nrel == 0:
        return None

    good = recs["item"].isin(truth.index)
    ranks = recs["rank"][good]
    disc = patience ** (ranks - 1)
    rbp = np.sum(disc)
    if normalize:
        # normalize by achievable RBP
        max = np.sum(patience ** np.arange(min(nrel, k)))
        # _log.info('rbp=%e, nrel=%d, eff=%d, max=%e', rbp, nrel, min(nrel, k), max)
        return rbp / max
        # normal RBP normalization
        return rbp * (1 - patience)

def _bulk_rbp(recs, truth, k=None, patience=0.5, normalize=False):
    if k is not None:
        recs = recs[recs["rank"] <= k]

    good = recs.join(truth, on=["LKTruthID", "item"], how="inner")
    good["rbp_disc"] = patience ** (good["rank"] - 1)
    scores = good.groupby("LKRecID")["rbp_disc"].sum()

    if normalize:
        tns = truth.reset_index().groupby("LKTruthID")["item"].count()
        if k is not None:
            tns[tns > k] = k
        max_nrel = np.max(tns)
        # compute 0...k-1 (the powers of k-1 for 1..k)
        kseq = np.arange(max_nrel)
        # compute the discounts at each k-1
        nd = patience**kseq
        # convert to a series of the sums, up through each k
        max_rbps = pd.Series(np.cumsum(nd), index=kseq + 1)

        # get a rec/truth mapping
        map = recs[["LKRecID", "LKTruthID"]].drop_duplicates()
        map.set_index("LKRecID", inplace=True)
        map = map.reindex(scores.index)
        # map to nrel, and then to the max RBPs
        map = map.join(tns.to_frame("nrel"), on="LKTruthID", how="left")
        map = map.join(max_rbps.to_frame("rbp_max"), on="nrel", how="left")

        # divide each score by max RBP
        scores /= map["rbp_max"]
        scores *= 1 - patience

    scores = scores.reindex(recs["LKRecID"].unique(), fill_value=0)
    return scores