lenskit/lkpy

View on GitHub
lenskit/crossfold.py

Summary

Maintainability
A
0 mins
Test Coverage
# This file is part of LensKit.
# Copyright (C) 2018-2023 Boise State University
# Copyright (C) 2023-2024 Drexel University
# Licensed under the MIT license, see LICENSE.md for details.
# SPDX-License-Identifier: MIT

"""
Data set cross-folding.
"""

import logging
from abc import ABC, abstractmethod
from collections import namedtuple

import numpy as np
import pandas as pd
from seedbank import numpy_rng

TTPair = namedtuple("TTPair", ["train", "test"])
TTPair.__doc__ = "Train-test pair (named tuple)."
TTPair.train.__doc__ = "Train data for this pair."
TTPair.test.__doc__ = "Test data for this pair."

_logger = logging.getLogger(__name__)


def partition_rows(data, partitions, *, rng_spec=None):
    """
    Partition a frame of ratings or other datainto train-test partitions.  This function does not
    care what kind of data is in `data`, so long as it is a Pandas DataFrame (or equivalent).

    Args:
        data(pandas.DataFrame):
            Ratings or other data you wish to partition.
        partitions(int):
            The number of partitions to produce.
        rng_spec:
            The random number generator or seed (see :py:func:`seedbank.numpy_rng`).

    Returns:
        iterator: an iterator of train-test pairs
    """

    confirm_unique_index(data)
    _logger.info("partitioning %d ratings into %d partitions", len(data), partitions)

    # create an array of indexes
    rows = np.arange(len(data))
    # shuffle the indices & split into partitions
    rng = numpy_rng(rng_spec)
    rng.shuffle(rows)
    test_sets = np.array_split(rows, partitions)

    # convert each partition into a split
    for i, ts in enumerate(test_sets):
        test = data.iloc[ts, :]
        trains = test_sets[:i] + test_sets[(i + 1) :]
        train_idx = np.concatenate(trains)
        train = data.iloc[train_idx, :]
        yield TTPair(train, test)


def sample_rows(data, partitions, size, disjoint=True, *, rng_spec=None):
    """
    Sample train-test a frame of ratings into train-test partitions.  This function does not care
    what kind of data is in `data`, so long as it is a Pandas DataFrame (or equivalent).

    We can loop over a sequence of train-test pairs::

        >>> from lenskit import datasets
        >>> ratings = datasets.MovieLens('data/ml-latest-small').ratings
        >>> for train, test in sample_rows(ratings, 5, 1000):
        ...     print(len(test))
        1000
        1000
        1000
        1000
        1000

    Sometimes for testing, it is useful to just get a single pair::

        >>> train, test = sample_rows(ratings, None, 1000)
        >>> len(test)
        1000
        >>> len(test) + len(train) - len(ratings)
        0

    Args:
        data(pandas.DataFrame):
            Data frame containing ratings or other data to partition.
        partitions(int or None):
            The number of partitions to produce.  If ``None``, produce a _single_ train-test
            pair instead of an iterator or list.
        size(int):
            The size of each sample.
        disjoint(bool):
            If ``True``, force samples to be disjoint.
        rng_spec:
            The random number generator or seed (see :py:func:`seedbank.numpy_rng`).

    Returns:
        iterator: An iterator of train-test pairs.
    """

    confirm_unique_index(data)
    rng = numpy_rng(rng_spec)
    if partitions is None:
        test = data.sample(n=size, random_state=rng)
        tr_mask = pd.Series(True, index=data.index)
        tr_mask.loc[test.index] = False
        train = data[tr_mask]
        return TTPair(train, test)

    if disjoint and partitions * size >= len(data):
        _logger.warning(
            "wanted %d disjoint splits of %d each, but only have %d rows; partitioning",
            partitions,
            size,
            len(data),
        )
        return partition_rows(data, partitions, rng_spec=rng)

    # create an array of indexes
    rows = np.arange(len(data))

    if disjoint:
        _logger.info("creating %d disjoint samples of size %d", partitions, size)
        ips = _disjoint_sample(rows, partitions, size, rng)

    else:
        _logger.info("taking %d samples of size %d", partitions, size)
        ips = _n_samples(rows, partitions, size, rng)

    return (TTPair(data.iloc[ip.train, :], data.iloc[ip.test, :]) for ip in ips)


def _disjoint_sample(xs, n, size, rng):
    # shuffle the indices & split into partitions
    rng.shuffle(xs)

    # convert each partition into a split
    for i in range(n):
        start = i * size
        test = xs[start : start + size]
        train = np.concatenate((xs[:start], xs[start + size :]))
        yield TTPair(train, test)


def _n_samples(xs, n, size, rng):
    for i in range(n):
        test = rng.choice(xs, size, False)
        train = np.setdiff1d(xs, test, assume_unique=True)
        yield TTPair(train, test)


class PartitionMethod(ABC):
    """
    Partition methods select test rows for a user or item.  Partition methods
    are callable; when called with a data frame, they return the test rows.
    """

    @abstractmethod
    def __call__(self, udf):
        """
        Subset a data frame.

        Args:
            udf(pandas.DataFrame):
                The input data frame of rows for a user or item.

        Returns:
            pandas.DataFrame:
                The data frame of test rows, a subset of ``udf``.
        """
        pass


class SampleN(PartitionMethod):
    """
    Randomly select a fixed number of test rows per user/item.

    Args:
        n(int): the number of test items to select
        rng: the random number generator or seed
    """

    def __init__(self, n, rng_spec=None):
        self.n = n
        self.rng = numpy_rng(rng_spec)

    def __call__(self, udf):
        return udf.sample(n=self.n, random_state=self.rng)


class SampleFrac(PartitionMethod):
    """
    Randomly select a fraction of test rows per user/item.

    Args:
        frac(float): the fraction items to select for testing.
    """

    def __init__(self, frac, rng_spec=None):
        self.fraction = frac
        self.rng = numpy_rng(rng_spec)

    def __call__(self, udf):
        return udf.sample(frac=self.fraction, random_state=self.rng)


class LastN(PartitionMethod):
    """
    Select a fixed number of test rows per user/item, based on ordering by a
    column.

    Args:
        n(int): The number of test items to select.
    """

    def __init__(self, n, col="timestamp"):
        self.n = n
        self.column = col

    def __call__(self, udf):
        return udf.sort_values(self.column).iloc[-self.n :]


class LastFrac(PartitionMethod):
    """
    Select a fraction of test rows per user/item.

    Args:
        frac(double): the fraction of items to select for testing.
    """

    def __init__(self, frac, col="timestamp"):
        self.fraction = frac
        self.column = col

    def __call__(self, udf):
        n = round(len(udf) * self.fraction)
        return udf.sort_values(self.column).iloc[-n:]


def partition_users(data, partitions: int, method: PartitionMethod, *, rng_spec=None):
    """
    Partition a frame of ratings or other data into train-test partitions user-by-user.
    This function does not care what kind of data is in `data`, so long as it is a Pandas DataFrame
    (or equivalent) and has a `user` column.

    Args:
        data(pandas.DataFrame): a data frame containing ratings or other data you wish to partition.
        partitions(int): the number of partitions to produce
        method(PartitionMethod): The method for selecting test rows for each user.
        rng_spec: The random number generator or seed (see :py:func:`seedbank.numpy_rng`).

    Returns
        iterator: an iterator of train-test pairs
    """

    confirm_unique_index(data)
    user_col = data["user"]
    users = user_col.unique()
    _logger.info(
        "partitioning %d rows for %d users into %d partitions", len(data), len(users), partitions
    )

    # create an array of indexes into user row
    rows = np.arange(len(users))
    # shuffle the indices & split into partitions
    rng = numpy_rng(rng_spec)
    rng.shuffle(rows)
    test_sets = np.array_split(rows, partitions)

    # convert each partition into a split
    for i, ts in enumerate(test_sets):
        # get our users!
        test_us = users[ts]
        # sample the data frame
        _logger.info("fold %d: selecting test ratings", i)
        ugf = data[data.user.isin(test_us)].groupby("user")
        test = ugf.apply(method)
        # get rid of the group index
        test = test.reset_index(0, drop=True)
        # now test is indexed on the data frame! so we can get the rest
        _logger.info("fold %d: partitioning training data", i)
        mask = pd.Series(True, index=data.index)
        mask[test.index] = False
        train = data[mask]
        yield TTPair(train, test)


def sample_users(
    data, partitions: int, size: int, method: PartitionMethod, disjoint=True, *, rng_spec=None
):
    """
    Create train-test partitions by sampling users.
    This function does not care what kind of data is in `data`, so long as it is
    a Pandas DataFrame (or equivalent) and has a `user` column.

    Args:
        data(pandas.DataFrame):
            Data frame containing ratings or other data you wish to partition.
        partitions(int):
            The number of partitions.
        size(int):
            The sample size.
        method(PartitionMethod):
            The method for obtaining user test ratings.
        rng_spec:
            The random number generator or seed (see :py:func:`seedbank.numpy_rng`).

    Returns:
        iterator: An iterator of train-test pairs (as :class:`TTPair` objects).
    """

    confirm_unique_index(data)
    rng = numpy_rng(rng_spec)

    user_col = data["user"]
    users = user_col.unique()
    if disjoint and partitions * size >= len(users):
        _logger.warning(
            "cannot take %d disjoint samples of size %d from %d users", partitions, size, len(users)
        )
        yield from partition_users(data, partitions, method)
        return

    _logger.info("sampling %d users into %d partitions (n=%d)", len(users), partitions, size)

    if disjoint:
        rng.shuffle(users)

    # generate our samples
    for i in range(partitions):
        # get our test users!
        if disjoint:
            test_us = users[i * size : (i + 1) * size]
        else:
            test_us = rng.choice(users, size, False)

        # sample the data frame
        test = data[data.user.isin(test_us)].groupby("user").apply(method)
        # get rid of the group index
        test = test.reset_index(0, drop=True)
        # now test is indexed on the data frame! so we can get the rest
        rest = data.index.difference(test.index)
        train = data.loc[rest]
        yield TTPair(train, test)


def simple_test_pair(ratings, n_users=1000, n_rates=5, f_rates=None):
    """
    Return a single, basic train-test pair for some ratings.  This is only intended
    for convenience use in test and demos - do not use for research.
    """

    if f_rates:
        samp = SampleFrac(f_rates)
    else:
        samp = SampleN(n_rates)

    train, test = next(sample_users(ratings, 1, n_users, samp))

    return train, test


def confirm_unique_index(data):
    """Confirms dataframe has unique index values, and if not,
    throws ValueError with helpful log message"""

    if not data.index.is_unique:
        _logger.error("Index has duplicate values")
        _logger.info(
            "If index values do not matter, consider running "
            + ".reset_index() on the dataframe before partitioning"
        )
        raise ValueError("Index is not uniquely valued")