lenskit/metrics/topn.py
# This file is part of LensKit.
# Copyright (C) 2018-2023 Boise State University
# Copyright (C) 2023-2024 Drexel University
# Licensed under the MIT license, see LICENSE.md for details.
# SPDX-License-Identifier: MIT
"""
Top-N evaluation metrics.
"""
import logging
import warnings
import numpy as np
import pandas as pd
_log = logging.getLogger(__name__)
def bulk_impl(metric):
"""
Decorator to register a bulk implementation for a metric.
"""
def wrap(impl):
metric.bulk_score = impl
return impl
return wrap
def precision(recs, truth, k=None):
"""
Compute recommendation precision. This is computed as:
.. math::
\\frac{|L \\cap I_u^{\\mathrm{test}}|}{|L|}
In the uncommon case that ``k`` is specified and ``len(recs) < k``, this metric uses
``len(recs)`` as the denominator.
This metric has a bulk implementation.
"""
if k is not None:
recs = recs.iloc[:k]
nrecs = len(recs)
if nrecs == 0:
return None
ngood = recs["item"].isin(truth.index).sum()
return ngood / nrecs
@bulk_impl(precision)
def _bulk_precision(recs, truth, k=None):
if k is not None:
recs = recs[recs["rank"] <= k]
lcounts = pd.Series(k, index=recs["LKRecID"].unique())
lcounts.index.name = "LKRecID"
else:
lcounts = recs.groupby(["LKRecID"])["item"].count()
good = recs.join(truth, on=["LKTruthID", "item"], how="inner")
gcounts = good.groupby(["LKRecID"])["item"].count()
lcounts, gcounts = lcounts.align(gcounts, join="left", fill_value=0)
return gcounts / lcounts
def recall(recs, truth, k=None):
"""
Compute recommendation recall. This is computed as:
.. math::
\\frac{|L \\cap I_u^{\\mathrm{test}}|}{\\operatorname{min}\\{|I_u^{\\mathrm{test}}|, k\\}}
This metric has a bulk implementation.
"""
nrel = len(truth)
if nrel == 0:
return None
if k is not None:
nrel = min(nrel, k)
recs = recs.iloc[:k]
ngood = recs["item"].isin(truth.index).sum()
return ngood / nrel
@bulk_impl(recall)
def _bulk_recall(recs, truth, k=None):
tcounts = truth.reset_index().groupby("LKTruthID")["item"].count()
if k is not None:
_log.debug("truncating to k for recall")
tcounts = np.minimum(tcounts, k)
recs = recs[recs["rank"] <= k]
good = recs.join(truth, on=["LKTruthID", "item"], how="inner")
gcounts = good.groupby("LKRecID")["item"].count()
# we need all lists, because some might have no truth (oops), some no recs (also oops)
lists = recs[["LKRecID", "LKTruthID"]].drop_duplicates()
scores = lists.join(gcounts.to_frame("ngood"), on="LKRecID", how="left")
scores["ngood"].fillna(0, inplace=True)
scores = scores.join(tcounts.to_frame("nrel"), on="LKTruthID", how="left")
scores = scores.set_index("LKRecID")
return scores["ngood"] / scores["nrel"]
def hit(recs, truth, k=None):
"""
Compute whether or not a list is a hit; any list with at least one relevant item in the
first :math:`k` positions (:math:`L_{\\le k} \\cap I_u^{\\mathrm{test}} \\ne \\emptyset`)
is scored as 1, and lists with no relevant items as 0. When averaged over the recommendation
lists, this computes the *hit rate* :cite:p:`Deshpande2004-ht`.
This metric has a bulk implementation.
"""
if len(truth) == 0:
return None
if k is not None:
recs = recs.iloc[:k]
good = recs["item"].isin(truth.index)
if np.any(good):
return 1
else:
return 0
@bulk_impl(hit)
def _bulk_hit(recs, truth, k=None):
tcounts = truth.reset_index().groupby("LKTruthID")["item"].count()
if k is not None:
_log.debug("truncating to k for recall")
tcounts = np.minimum(tcounts, k)
recs = recs[recs["rank"] <= k]
good = recs.join(truth, on=["LKTruthID", "item"], how="inner")
gcounts = good.groupby("LKRecID")["item"].count()
# we need all lists, because some might have no truth (oops), some no recs (also oops)
lists = recs[["LKRecID", "LKTruthID"]].drop_duplicates()
scores = lists.join(gcounts.to_frame("ngood"), on="LKRecID", how="left")
scores["ngood"].fillna(0, inplace=True)
scores = scores.join(tcounts.to_frame("nrel"), on="LKTruthID", how="left")
scores = scores.set_index("LKRecID")
good = scores["ngood"] > 0
good = good.astype("f4")
good[scores["nrel"] == 0] = np.nan
return good
def recip_rank(recs, truth, k=None):
"""
Compute the reciprocal rank :cite:p:`Kantor1997-lm` of the first relevant
item in a list of recommendations. Let :math:`\\kappa` denote the 1-based
rank of the first relevant item in :math:`L`, with :math:`\\kappa=\\infty`
if none of the first :math:`k` items in :math:`L` are relevant; then the
reciprocal rank is :math:`1 / \\kappa`. If no elements are relevant, the
reciprocal rank is therefore 0. :cite:t:`Deshpande2004-ht` call this the
“reciprocal hit rate”.
This metric has a bulk equivalent.
"""
if k is not None:
recs = recs.iloc[:k]
good = recs["item"].isin(truth.index)
(npz,) = np.nonzero(good.to_numpy())
if len(npz):
return 1.0 / (npz[0] + 1.0)
else:
return 0.0
@bulk_impl(recip_rank)
def _bulk_rr(recs, truth, k=None):
# find everything with truth
if k is not None:
recs = recs[recs["rank"] <= k]
joined = recs.join(truth, on=["LKTruthID", "item"], how="inner")
# compute min ranks
ranks = joined.groupby("LKRecID")["rank"].agg("min")
# reciprocal ranks
scores = 1.0 / ranks
_log.debug("have %d scores with MRR %.3f", len(scores), scores.mean())
# fill with zeros
rec_ids = recs["LKRecID"].unique()
scores = scores.reindex(rec_ids, fill_value=0.0)
_log.debug("filled to get %s scores w/ MRR %.3f", len(scores), scores.mean())
# and we're done
return scores
def _dcg(scores, discount=np.log2):
"""
Compute the Discounted Cumulative Gain of a series of recommended items with rating scores.
These should be relevance scores; they can be :math:`{0,1}` for binary relevance data.
This is not a true top-N metric, but is a utility function for other metrics.
Args:
scores(array-like):
The utility scores of a list of recommendations, in recommendation order.
discount(ufunc):
the rank discount function. Each item's score will be divided the discount of its rank,
if the discount is greater than 1.
Returns:
double: the DCG of the scored items.
"""
scores = np.nan_to_num(scores)
ranks = np.arange(1, len(scores) + 1)
disc = discount(ranks)
np.maximum(disc, 1, out=disc)
np.reciprocal(disc, out=disc)
return np.dot(scores, disc)
def _fixed_dcg(n, discount=np.log2):
ranks = np.arange(1, n + 1)
disc = discount(ranks)
disc = np.maximum(disc, 1)
disc = np.reciprocal(disc)
return np.sum(disc)
def dcg(recs, truth, discount=np.log2):
"""
Compute the **unnormalized** discounted cumulative gain :cite:p:`Jarvelin2002-xf`.
Discounted cumultative gain is computed as:
.. math::
\\begin{align*}
\\mathrm{DCG}(L,u) & = \\sum_{i=1}^{|L|} \\frac{r_{ui}}{d(i)}
\\end{align*}
Unrated items are assumed to have a utility of 0; if no rating values are provided in the
truth frame, item ratings are assumed to be 1.
Args:
recs: The recommendation list.
truth: The user's test data.
discount(ufunc):
The rank discount function. Each item's score will be divided the discount of its rank,
if the discount is greater than 1.
"""
tpos = truth.index.get_indexer(recs["item"])
tgood = tpos >= 0
if "rating" in truth.columns:
# make an array of ratings for this rec list
r_rates = truth["rating"].values[tpos]
r_rates[tpos < 0] = 0
achieved = _dcg(r_rates, discount)
else:
achieved = _dcg(tgood, discount)
return achieved
def ndcg(recs, truth, discount=np.log2, k=None):
"""
Compute the normalized discounted cumulative gain :cite:p:`ndcg`.
Discounted cumultative gain is computed as:
.. math::
\\begin{align*}
\\mathrm{DCG}(L,u) & = \\sum_{i=1}^{|L|} \\frac{r_{ui}}{d(i)}
\\end{align*}
Unrated items are assumed to have a utility of 0; if no rating values are provided in the
truth frame, item ratings are assumed to be 1.
This is then normalized as follows:
.. math::
\\begin{align*}
\\mathrm{nDCG}(L, u) & = \\frac{\\mathrm{DCG}(L,u)}{\\mathrm{DCG}(L_{\\mathrm{ideal}}, u)}
\\end{align*}
This metric has a bulk implementation.
Args:
recs: The recommendation list.
truth: The user's test data.
discount(numpy.ufunc):
The rank discount function. Each item's score will be divided the discount of its rank,
if the discount is greater than 1.
k(int):
The maximum list length.
"""
if k is not None:
recs = recs.iloc[:k]
tpos = truth.index.get_indexer(recs["item"])
if "rating" in truth.columns:
i_rates = np.sort(truth.rating.values)[::-1]
if k is not None:
i_rates = i_rates[:k]
ideal = _dcg(i_rates, discount)
# make an array of ratings for this rec list
r_rates = truth["rating"].values[tpos]
r_rates[tpos < 0] = 0
achieved = _dcg(r_rates, discount)
else:
ntrue = len(truth)
if k is not None and ntrue > k:
ntrue = k
ideal = _fixed_dcg(ntrue, discount)
tgood = tpos >= 0
achieved = _dcg(tgood, discount)
return achieved / ideal
@bulk_impl(ndcg)
def _bulk_ndcg(recs, truth, discount=np.log2, k=None):
if "rating" not in truth.columns:
truth = truth.assign(rating=np.ones(len(truth), dtype=np.float32))
ideal = truth.groupby(level="LKTruthID")["rating"].rank(method="first", ascending=False)
if k is not None:
ideal = ideal[ideal <= k]
ideal = discount(ideal)
ideal = np.maximum(ideal, 1)
ideal = truth["rating"] / ideal
ideal = ideal.groupby(level="LKTruthID").sum()
ideal.name = "ideal"
list_ideal = recs[["LKRecID", "LKTruthID"]].drop_duplicates()
list_ideal = list_ideal.join(ideal, on="LKTruthID", how="left")
list_ideal = list_ideal.set_index("LKRecID")
if k is not None:
recs = recs[recs["rank"] <= k]
rated = recs.join(truth, on=["LKTruthID", "item"], how="inner")
rd = discount(rated["rank"])
rd = np.maximum(rd, 1)
rd = rated["rating"] / rd
rd = rated[["LKRecID"]].assign(util=rd)
dcg = rd.groupby(["LKRecID"])["util"].sum().reset_index(name="dcg")
dcg = dcg.set_index("LKRecID")
dcg = dcg.join(list_ideal, how="outer")
dcg["ndcg"] = dcg["dcg"].fillna(0) / dcg["ideal"]
return dcg["ndcg"]
def rbp(recs, truth, k=None, patience=0.5, normalize=False):
"""
Evaluate recommendations with rank-biased precision :cite:p:`rbp` with a
patience parameter :math:`\\gamma`.
If :math:`r_{ui} \\in \\{0, 1\\}` is binary implicit ratings, this is
computed by:
.. math::
\\begin{align*}
\\operatorname{RBP}_\\gamma(L, u) & =(1 - \\gamma) \\sum_i r_{ui} p^i
\\end{align*}
The original RBP metric depends on the idea that the rank-biased sum of
binary relevance scores in an infinitely-long, perfectly-precise list has is
:math:`1/(1 - \\gamma)`. However, in recommender evaluation, we usually have
a small test set, so the maximum achievable RBP is significantly less, and
is a function of the number of test items. With ``normalize=True``, the RBP
metric will be normalized by the maximum achievable with the provided test
data.
Parameters:
recs: the recommendation list.
truth: the user's truth data.
k(int): the maximum recommendation list length.
patience(float): the patience parameter :math:`\\gamma`, the probability
that the user continues browsing at each point.
normalize(bool): whether to normalize the RBP
scores; if ``True``, divides the RBP score by the maximum achievable
with the test data.
"""
if k is not None and k <= len(recs):
recs = recs.iloc[:k]
else:
k = len(recs)
if "rank" not in recs.columns:
recs = recs.assign(rank=np.arange(1, len(recs) + 1))
if np.min(recs["rank"]) != 1:
warnings.warn("rank should start with 1")
nrel = len(truth)
if nrel == 0:
return None
good = recs["item"].isin(truth.index)
ranks = recs["rank"][good]
disc = patience ** (ranks - 1)
rbp = np.sum(disc)
if normalize:
# normalize by achievable RBP
max = np.sum(patience ** np.arange(min(nrel, k)))
# _log.info('rbp=%e, nrel=%d, eff=%d, max=%e', rbp, nrel, min(nrel, k), max)
return rbp / max
else:
# normal RBP normalization
return rbp * (1 - patience)
@bulk_impl(rbp)
def _bulk_rbp(recs, truth, k=None, patience=0.5, normalize=False):
if k is not None:
recs = recs[recs["rank"] <= k]
good = recs.join(truth, on=["LKTruthID", "item"], how="inner")
good["rbp_disc"] = patience ** (good["rank"] - 1)
scores = good.groupby("LKRecID")["rbp_disc"].sum()
if normalize:
tns = truth.reset_index().groupby("LKTruthID")["item"].count()
if k is not None:
tns[tns > k] = k
max_nrel = np.max(tns)
# compute 0...k-1 (the powers of k-1 for 1..k)
kseq = np.arange(max_nrel)
# compute the discounts at each k-1
nd = patience**kseq
# convert to a series of the sums, up through each k
max_rbps = pd.Series(np.cumsum(nd), index=kseq + 1)
# get a rec/truth mapping
map = recs[["LKRecID", "LKTruthID"]].drop_duplicates()
map.set_index("LKRecID", inplace=True)
map = map.reindex(scores.index)
# map to nrel, and then to the max RBPs
map = map.join(tns.to_frame("nrel"), on="LKTruthID", how="left")
map = map.join(max_rbps.to_frame("rbp_max"), on="nrel", how="left")
# divide each score by max RBP
scores /= map["rbp_max"]
else:
scores *= 1 - patience
scores = scores.reindex(recs["LKRecID"].unique(), fill_value=0)
return scores