lenskit/algorithms/item_knn.py
# This file is part of LensKit.
# Copyright (C) 2018-2023 Boise State University
# Copyright (C) 2023-2024 Drexel University
# Licensed under the MIT license, see LICENSE.md for details.
# SPDX-License-Identifier: MIT
"""
Item-based k-NN collaborative filtering.
"""
import logging
import warnings
from sys import intern
import csr.kernel as csrk
import numpy as np
import pandas as pd
import scipy.sparse as sps
import scipy.sparse.linalg as spla
from csr import CSR, create_empty, create_from_sizes
from numba import njit, prange
from numba.typed import List
from lenskit import ConfigWarning, DataWarning, util
from lenskit.data import sparse_ratings
from lenskit.sharing import in_share_context
from lenskit.util.accum import kvp_minheap_insert, kvp_minheap_sort
from lenskit.util.parallel import is_mp_worker
from . import Predictor
_logger = logging.getLogger(__name__)
def _make_blocks(n, size):
"Create blocks for the range 0..n."
return [(s, min(s + size, n)) for s in range(0, n, size)]
@njit(parallel=not is_mp_worker())
def _sort_nbrs(smat: CSR):
for i in prange(smat.nrows):
sp, ep = smat.row_extent(i)
kvp_minheap_sort(sp, ep, smat.colinds, smat.values)
@njit
def _trim_sim_block(nitems, bsp, bitems, block, min_sim, max_nbrs):
# pass 1: compute the size of each row
sizes = np.zeros(bitems, np.int32)
for i in range(nitems):
sp, ep = block.row_extent(i)
for j in range(sp, ep):
# we accept the neighbor if it passes threshold and isn't a self-similarity
r = block.colinds[j]
if i != bsp + r and block.values[j] >= min_sim:
sizes[r] += 1
if max_nbrs > 0:
for i in range(bitems):
if sizes[i] > max_nbrs:
sizes[i] = max_nbrs
# if bnc == 0:
# # empty resulting matrix, oops
# return _empty_csr(bitems, nitems, np.zeros(bitems, np.int32))
# allocate a matrix
block_csr = create_from_sizes(bitems, nitems, sizes)
# pass 2: truncate each row into the matrix
eps = block_csr.rowptrs[:-1].copy()
for c in range(nitems):
sp, ep = block.row_extent(c)
for j in range(sp, ep):
v = block.values[j]
r = block.colinds[j]
sp, lep = block_csr.row_extent(r)
lim = lep - sp
if c != bsp + r and v >= min_sim:
eps[r] = kvp_minheap_insert(
sp, eps[r], lim, c, v, block_csr.colinds, block_csr.values
)
# we're done!
return block_csr
@njit
def _sim_block(block, bsp, bep, rmh, min_sim, max_nbrs, nitems):
"Compute a single block of the similarity matrix"
# assert block.nrows == bep - bsp
bitems = block.nrows
if block.nnz == 0:
return create_empty(bitems, nitems)
# create a matrix handle for the subset matrix
amh = csrk.to_handle(block)
smh = csrk.mult_abt(rmh, amh)
csrk.release_handle(amh)
csrk.order_columns(smh)
block_csr = csrk.from_handle(smh)
# this is allowed now
csrk.release_handle(smh)
block_csr = _trim_sim_block(nitems, bsp, bitems, block_csr, min_sim, max_nbrs)
return block_csr
@njit(nogil=True, parallel=not is_mp_worker())
def _sim_blocks(trmat, blocks, ptrs, min_sim, max_nbrs):
"Compute the similarity matrix with blocked CSR kernel calls"
nitems = trmat.nrows
nblocks = len(blocks)
null = create_empty(1, 1)
res = [null for i in range(nblocks)]
rmat_h = csrk.to_handle(trmat)
csrk.order_columns(rmat_h)
for bi in prange(nblocks):
b = blocks[bi]
p = ptrs[bi]
bs, be = p
bres = _sim_block(b, bs, be, rmat_h, min_sim, max_nbrs, nitems)
res[bi] = bres
csrk.release_handle(rmat_h)
return res
@njit(nogil=True)
def _predict_weighted_average(model, nitems, nrange, ratings, rated, targets):
"Weighted average prediction function"
min_nbrs, max_nbrs = nrange
scores = np.full(nitems, np.nan, dtype=np.float_)
for i in prange(targets.shape[0]):
iidx = targets[i]
rptr = model.rowptrs[iidx]
rend = model.rowptrs[iidx + 1]
num = 0
denom = 0
nnbrs = 0
for j in range(rptr, rend):
nidx = model.colinds[j]
if not rated[nidx]:
continue
nnbrs = nnbrs + 1
num = num + ratings[nidx] * model.values[j]
denom = denom + np.abs(model.values[j])
if max_nbrs > 0 and nnbrs >= max_nbrs:
break
if nnbrs < min_nbrs:
continue
scores[iidx] = num / denom
return scores
@njit(nogil=True)
def _predict_sum(model, nitems, nrange, ratings, rated, targets):
"Sum-of-similarities prediction function"
min_nbrs, max_nbrs = nrange
scores = np.full(nitems, np.nan, dtype=np.float_)
for i in prange(targets.shape[0]):
iidx = targets[i]
rptr = model.rowptrs[iidx]
rend = model.rowptrs[iidx + 1]
score = 0
nnbrs = 0
for j in range(rptr, rend):
nidx = model.colinds[j]
if not rated[nidx]:
continue
nnbrs = nnbrs + 1
score = score + model.values[j]
if max_nbrs > 0 and nnbrs >= max_nbrs:
break
if nnbrs < min_nbrs:
continue
scores[iidx] = score
return scores
_predictors = {"weighted-average": _predict_weighted_average, "sum": _predict_sum}
class ItemItem(Predictor):
"""
Item-item nearest-neighbor collaborative filtering with ratings. This
item-item implementation is not terribly configurable; it hard-codes design
decisions found to work well in the previous Java-based LensKit code
:cite:p:`Ekstrand2011-bp`. This implementation is based on the description
of item-based CF by :cite:t:`Deshpande2004-ht`, and produces results
equivalent to Java LensKit.
The k-NN predictor supports several aggregate functions:
``weighted-average``
The weighted average of the user's rating values, using item-item
similarities as weights.
``sum``
The sum of the similarities between the target item and the user's rated
items, regardless of the rating the user gave the items.
Args:
nnbrs(int):
the maximum number of neighbors for scoring each item (``None`` for
unlimited)
min_nbrs(int): the minimum number of neighbors for scoring each item
min_sim(float): minimum similarity threshold for considering a neighbor
save_nbrs(float):
the number of neighbors to save per item in the trained model
(``None`` for unlimited)
feedback(str):
Control how feedback should be interpreted. Specifies defaults for
the other settings, which can be overridden individually; can be one
of the following values:
``explicit``
Configure for explicit-feedback mode: use rating values, center
ratings, and use the ``weighted-average`` aggregate method for
prediction. This is the default setting.
``implicit``
Configure for implicit-feedback mode: ignore rating values, do
not center ratings, and use the ``sum`` aggregate method for
prediction.
center(bool):
whether to normalize (mean-center) rating vectors prior to computing
similarities and aggregating user rating values. Defaults to
``True``; turn this off when working with unary data and other data
types that don't respond well to centering.
aggregate(str):
the type of aggregation to do. Can be ``weighted-average`` (the
default) or ``sum``.
use_ratings(bool):
whether or not to use the rating values. If ``False``, it ignores
rating values and considers an implicit feedback signal of 1 for
every (user,item) pair present.
Attributes:
item_index_(pandas.Index): the index of item IDs.
item_means_(numpy.ndarray): the mean rating for each known item.
item_counts_(numpy.ndarray): the number of saved neighbors for each
item. sim_matrix_(matrix.CSR): the similarity matrix.
user_index_(pandas.Index): the index of known user IDs for the rating
matrix. rating_matrix_(matrix.CSR): the user-item rating matrix for
looking up users' ratings.
"""
IGNORED_PARAMS = ["feedback"]
EXTRA_PARAMS = ["center", "aggregate", "use_ratings"]
AGG_SUM = intern("sum")
AGG_WA = intern("weighted-average")
RATING_AGGS = [AGG_WA] # the aggregates that use rating values
def __init__(
self, nnbrs, min_nbrs=1, min_sim=1.0e-6, save_nbrs=None, feedback="explicit", **kwargs
):
self.nnbrs = nnbrs
if self.nnbrs is not None and self.nnbrs < 1:
self.nnbrs = -1
self.min_nbrs = min_nbrs
if self.min_nbrs is not None and self.min_nbrs < 1:
self.min_nbrs = 1
self.min_sim = min_sim
self.save_nbrs = save_nbrs
if feedback == "explicit":
defaults = {"center": True, "aggregate": self.AGG_WA, "use_ratings": True}
elif feedback == "implicit":
defaults = {"center": False, "aggregate": self.AGG_SUM, "use_ratings": False}
else:
raise ValueError(f"invalid feedback mode: {feedback}")
defaults.update(kwargs)
self.center = defaults["center"]
self.aggregate = intern(defaults["aggregate"])
self.use_ratings = defaults["use_ratings"]
self._check_setup()
def _check_setup(self):
if not self.use_ratings:
if self.center:
_logger.warning(
"item-item configured to ignore ratings, but ``center=True`` - likely bug"
)
warnings.warn(
util.clean_str(
"""
item-item configured to ignore ratings, but ``center=True``. This configuration
is unlikely to work well.
"""
),
ConfigWarning,
)
if self.aggregate == "weighted-average":
_logger.warning(
"item-item ignoring ratings but using weighted averages - likely bug"
)
warnings.warn(
util.clean_str(
"""
item-item ignoring ratings but using weighted averages. This configuration
is unlikely to work well.
"""
),
ConfigWarning,
)
def fit(self, ratings, **kwargs):
"""
Train a model.
The model-training process depends on ``save_nbrs`` and ``min_sim``, but *not* on other
algorithm parameters.
Args:
ratings(pandas.DataFrame):
(user,item,rating) data for computing item similarities.
"""
util.check_env()
# Training proceeds in 2 steps:
# 1. Normalize item vectors to be mean-centered and unit-normalized
# 2. Compute similarities with pairwise dot products
self._timer = util.Stopwatch()
_logger.debug("[%s] beginning fit, memory use %s", self._timer, util.max_memory())
_logger.debug("[%s] using CSR kernel %s", self._timer, csrk.name)
init_rmat, users, items = sparse_ratings(ratings)
n_items = len(items)
_logger.info(
"[%s] made sparse matrix for %d items (%d ratings from %d users)",
self._timer,
len(items),
init_rmat.nnz,
len(users),
)
_logger.debug("[%s] made matrix, memory use %s", self._timer, util.max_memory())
rmat, item_means = self._mean_center(ratings, init_rmat, items)
_logger.debug("[%s] centered, memory use %s", self._timer, util.max_memory())
rmat = self._normalize(rmat)
_logger.debug("[%s] normalized, memory use %s", self._timer, util.max_memory())
_logger.info("[%s] computing similarity matrix", self._timer)
smat = self._compute_similarities(rmat)
_logger.debug("[%s] computed, memory use %s", self._timer, util.max_memory())
_logger.info(
"[%s] got neighborhoods for %d of %d items",
self._timer,
np.sum(np.diff(smat.rowptrs) > 0),
n_items,
)
_logger.info("[%s] computed %d neighbor pairs", self._timer, smat.nnz)
self.item_index_ = items
self.item_means_ = item_means
self.item_counts_ = np.diff(smat.rowptrs)
self.sim_matrix_ = smat
self.user_index_ = users
self.rating_matrix_ = init_rmat
# create an inverted similarity matrix for efficient scanning
self._sim_inv_ = smat.transpose()
_logger.info("[%s] transposed matrix for optimization", self._timer)
_logger.debug("[%s] done, memory use %s", self._timer, util.max_memory())
return self
def _mean_center(self, ratings, rmat, items):
if not self.center:
return rmat, None
item_means = ratings.groupby("item").rating.mean()
item_means = item_means.reindex(items).values
mcvals = rmat.values - item_means[rmat.colinds]
nmat = rmat.copy(False)
nmat.values = mcvals
if np.allclose(nmat.values, 0):
_logger.warn("normalized ratings are zero, centering is not recommended")
warnings.warn(
"Ratings seem to have the same value, centering is not recommended.", DataWarning
)
_logger.info("[%s] computed means for %d items", self._timer, len(item_means))
return nmat, item_means
def _normalize(self, rmat):
rmat = rmat.to_scipy()
# compute column norms
norms = spla.norm(rmat, 2, axis=0)
# and multiply by a diagonal to normalize columns
recip_norms = norms.copy()
is_nz = recip_norms > 0
recip_norms[is_nz] = np.reciprocal(recip_norms[is_nz])
norm_mat = rmat @ sps.diags(recip_norms)
assert norm_mat.shape[1] == rmat.shape[1]
# and reset NaN
norm_mat.data[np.isnan(norm_mat.data)] = 0
_logger.info("[%s] normalized rating matrix columns", self._timer)
return CSR.from_scipy(norm_mat, False)
def _compute_similarities(self, rmat):
trmat = rmat.transpose()
nitems = trmat.nrows
m_nbrs = self.save_nbrs
if m_nbrs is None or m_nbrs < 0:
m_nbrs = 0
bounds = _make_blocks(nitems, 1000)
_logger.info(
"[%s] splitting %d items (%d ratings) into %d blocks",
self._timer,
nitems,
trmat.nnz,
len(bounds),
)
blocks = [trmat.subset_rows(sp, ep) for (sp, ep) in bounds]
_logger.info("[%s] computing similarities", self._timer)
ptrs = List(bounds)
nbs = List(blocks)
if not nbs:
# oops, this is the bad place
# in non-JIT node, List doesn't actually make the list
nbs = blocks
ptrs = bounds
s_blocks = _sim_blocks(trmat, nbs, ptrs, self.min_sim, m_nbrs)
nnz = sum(b.nnz for b in s_blocks)
tot_rows = sum(b.nrows for b in s_blocks)
_logger.info(
"[%s] computed %d similarities for %d items in %d blocks",
self._timer,
nnz,
tot_rows,
len(s_blocks),
)
row_nnzs = np.concatenate([b.row_nnzs() for b in s_blocks])
assert len(row_nnzs) == nitems, "only have {} rows for {} items".format(
len(row_nnzs), nitems
)
smat = CSR.empty(nitems, nitems, row_nnzs)
start = 0
for bi, b in enumerate(s_blocks):
bnr = b.nrows
end = start + bnr
v_sp = smat.rowptrs[start]
v_ep = smat.rowptrs[end]
_logger.debug(
"block %d (%d:%d) has %d entries, storing in %d:%d",
bi,
start,
end,
b.nnz,
v_sp,
v_ep,
)
smat.colinds[v_sp:v_ep] = b.colinds
smat.values[v_sp:v_ep] = b.values
start = end
_logger.info("[%s] sorting similarity matrix with %d entries", self._timer, smat.nnz)
_sort_nbrs(smat)
return smat
def predict_for_user(self, user, items, ratings=None):
_logger.debug("predicting %d items for user %s", len(items), user)
if ratings is None:
if user not in self.user_index_:
_logger.debug("user %s missing, returning empty predictions", user)
return pd.Series(np.nan, index=items)
upos = self.user_index_.get_loc(user)
ratings = pd.Series(
self.rating_matrix_.row_vs(upos),
index=pd.Index(self.item_index_[self.rating_matrix_.row_cs(upos)]),
)
if not ratings.index.is_unique:
wmsg = "user {} has duplicate ratings, this is likely to cause problems".format(user)
warnings.warn(wmsg, DataWarning)
# set up rating array
# get rated item positions & limit to in-model items
n_items = len(self.item_index_)
ri_pos = self.item_index_.get_indexer(ratings.index)
m_rates = ratings[ri_pos >= 0]
ri_pos = ri_pos[ri_pos >= 0]
rate_v = np.full(n_items, np.nan, dtype=np.float_)
rated = np.zeros(n_items, dtype="bool")
# mean-center the rating array
if self.center:
rate_v[ri_pos] = m_rates.values - self.item_means_[ri_pos]
else:
rate_v[ri_pos] = m_rates.values
rated[ri_pos] = True
_logger.debug("user %s: %d of %d rated items in model", user, len(ri_pos), len(ratings))
assert np.sum(np.logical_not(np.isnan(rate_v))) == len(ri_pos)
assert np.all(np.isnan(rate_v) == np.logical_not(rated))
# set up item result vector
# ipos will be an array of item indices
i_pos = self.item_index_.get_indexer(items)
i_pos = i_pos[i_pos >= 0]
_logger.debug("user %s: %d of %d requested items in model", user, len(i_pos), len(items))
# now we take a first pass through the data to count _viable_ targets
# This computes the number of neighbors (and their weight sum) for
# each target item based on the user's ratings, allowing us to fast-path
# other computations and avoid as many neighbor truncations as possible
i_cts, i_sums, i_nbrs = self._count_viable_targets(i_pos, ri_pos)
viable = i_cts >= self.min_nbrs
i_pos = i_pos[viable]
i_cts = i_cts[viable]
i_sums = i_sums[viable]
i_nbrs = i_nbrs[viable]
_logger.debug(
"user %s: %d of %d requested items possibly reachable", user, len(i_pos), len(items)
)
# look for some fast paths
if self.aggregate == self.AGG_SUM and self.min_sim >= 0:
# similarity sums are all we need
if self.nnbrs >= 0:
fast_mask = i_cts <= self.nnbrs
fast_items = i_pos[fast_mask]
fast_scores = i_sums[fast_mask]
slow_items = i_pos[~fast_mask]
else:
fast_items = i_pos
fast_scores = i_sums
slow_items = np.array([], dtype="i4")
_logger.debug(
"user %s: using fast-path similarity sum for %d items", user, len(fast_items)
)
if len(slow_items):
iscores = _predict_sum(
self.sim_matrix_,
len(self.item_index_),
(self.min_nbrs, self.nnbrs),
rate_v,
rated,
slow_items,
)
else:
iscores = np.full(len(self.item_index_), np.nan)
iscores[fast_items] = fast_scores
elif self.aggregate == self.AGG_WA and self.min_nbrs == 1:
# fast-path single-neighbor targets - common in sparse data
fast_mask = i_cts == 1
fast_items = i_pos[fast_mask]
fast_scores = rate_v[i_nbrs[fast_mask]]
if self.min_sim < 0:
fast_scores *= np.sign(i_sums[fast_mask])
_logger.debug("user %s: fast-pathed %d scores", user, len(fast_scores))
slow_items = i_pos[i_cts > 1]
iscores = _predict_weighted_average(
self.sim_matrix_,
len(self.item_index_),
(self.min_nbrs, self.nnbrs),
rate_v,
rated,
slow_items,
)
iscores[fast_items] = fast_scores
else:
# now compute the predictions
_logger.debug("user %s: taking the slow path", user)
agg = _predictors[self.aggregate]
iscores = agg(
self.sim_matrix_,
len(self.item_index_),
(self.min_nbrs, self.nnbrs),
rate_v,
rated,
i_pos,
)
if self.center and self.aggregate in self.RATING_AGGS:
iscores += self.item_means_
results = pd.Series(iscores, index=self.item_index_)
results = results.reindex(items, fill_value=np.nan)
_logger.debug(
"user %s: predicted for %d of %d items", user, results.notna().sum(), len(items)
)
return results
def _count_viable_targets(self, targets, rated):
"Count upper-bound on possible neighbors for target items and rated items."
# initialize counts to zero
counts = np.zeros(len(self.item_index_), dtype=np.int32)
sums = np.zeros(len(self.item_index_))
last_nbrs = np.full(len(self.item_index_), -1, "i4")
# count the number of times each item is reachable from the neighborhood
for ri in rated:
nbrs = self._sim_inv_.row_cs(ri)
counts[nbrs] += 1
sums[nbrs] += self._sim_inv_.row_vs(ri)
last_nbrs[nbrs] = ri
# we want the reachability counts for the target items
return counts[targets], sums[targets], last_nbrs[targets]
def __getstate__(self):
state = dict(self.__dict__)
if "_sim_inv_" in state and not in_share_context():
del state["_sim_inv_"]
return state
def __setstate__(self, state):
self.__dict__.update(state)
if hasattr(self, "sim_matrix_") and not hasattr(self, "_sim_inv_"):
self._sim_inv_ = self.sim_matrix_.transpose()
def __str__(self):
return "ItemItem(nnbrs={}, msize={})".format(self.nnbrs, self.save_nbrs)