fabiommendes/sidekick

View on GitHub
sidekick-seq/sidekick/seq/lib_reducers.py

Summary

Maintainability
A
0 mins
Test Coverage
import warnings
from functools import reduce as _reduce

from .iter import Iter, generator
from .lib_basic import uncons
from .._toolz import accumulate as _accumulate, topk as _topk, reduceby
from ..functions import fn, to_callable
from ..typing import Func, Seq, Pred, TYPE_CHECKING, NOT_GIVEN

if TYPE_CHECKING:
    from .. import api as sk  # noqa: F401
    from ..api import X, Y  # noqa: F401

_sum = sum


#
# Basic reductions and folds
#
@fn.curry(3)
def fold(func: Func, init, seq: Seq):
    """
    Perform a left reduction of sequence.

    Examples:
        >>> sk.fold(op.add, 0, [1, 2, 3, 4])
        10

    See Also:
        :func:`reduce`
        :func:`scan`
    """
    func = to_callable(func)
    return _reduce(func, seq, init)


@fn.curry(2)
def reduce(func: Func, seq: Seq, initial=NOT_GIVEN):
    """
    Like fold, but does not require initial value.

    This function raises a ValueError on empty sequences.

    Examples:
        >>> sk.reduce(op.add, [1, 2, 3, 4])
        10

    See Also:
        :func:`fold`
        :func:`acc`
    """
    if initial is not NOT_GIVEN:
        warnings.warn("use the sk.fold() function to set initial values.")
        return fold(func, initial, seq)

    func = to_callable(func)
    return _reduce(func, seq)


@fn.curry(2)
def acc(func: Func, seq: Seq) -> Iter:
    """
    Like :func:`scan`, but uses first item of sequence as initial value.

    See Also:
        :func:`scan`
        :func:`reduce`
    """
    func = to_callable(func)
    return Iter(_accumulate(func, seq))


@fn.curry(3)
def scan(func: Func, init, seq: Seq) -> Iter:
    """
    Returns a sequence of the intermediate folds of seq by func.

    In other words it generates a sequence like:

        func(init, seq[0]), func(result[0], seq[1]), ...

    in which result[i] corresponds to items in the resulting sequence.

    See Also:
        :func:`acc`
        :func:`fold`
    """
    func = to_callable(func)
    return Iter(_accumulate(func, seq, init))


@fn.curry(4)
def fold_by(key: Func, op, init, seq: Seq) -> dict:
    """
    Reduce each sequence generated by a group by.

    More efficient than performing separate operations since it does not
    store intermediate groups.

    Examples:
        >>> sk.fold_by(X % 2, op.add, 0, [1, 2, 3, 4, 5])
        {1: 9, 0: 6}

    See Also:
        :func:`fold`
        :func:`group_by`
        :func:`reduce_by`
    """
    return reduceby(key, op, seq, init)


@fn.curry(3)
def reduce_by(key: Func, op, seq: Seq) -> dict:
    """
    Similar to fold_by, but only works on non-empty sequences.

    Initial value is taken to be the first element in sequence.

    Examples:
        >>> sk.reduce_by(X % 2, op.add, [1, 2, 3, 4, 5])
        {1: 9, 0: 6}

    See Also:
        :func:`fold`
        :func:`group_by`
        :func:`reduce_by`
    """
    return reduceby(key, op, seq)


@fn.curry(1)
def fold_together(seq: Seq, **kwargs) -> dict:
    """
    Folds using multiple functions simultaneously.

    Examples:
        >>> seq = [1, 2, 3, 4, 5]
        >>> sk.fold_together(seq, sum=((X + Y), 0), prod=((X * Y), 1))
        {'sum': 15, 'prod': 120}
    """

    data = [[func, v] for (func, v) in kwargs.values()]
    for x in seq:
        for pair in data:
            func, v = pair
            pair[1] = func(v, x)
    return {k: v for k, (_, v) in zip(kwargs, data)}


@fn.curry(1)
def reduce_together(seq: Seq, **kwargs):
    """
    Similar to fold_together, but only works on non-empty sequences.

    Initial value is taken to be the first element in sequence.

    Examples:
        >>> seq = [1, 2, 3, 4, 5]
        >>> sk.reduce_together(seq, sum=(X + Y), prod=(X * Y), max=max, min=min)
        {'sum': 15, 'prod': 120, 'max': 5, 'min': 1}
    """
    x, seq = uncons(seq)
    kwargs = {k: (fn, x) for k, fn in kwargs.items()}
    return fold_together(seq, **kwargs)


@fn.curry(1)
def scan_together(seq: Seq, *args, **kwargs) -> Iter[dict]:
    """
    Folds using multiple functions simultaneously.

    Initial value is passed as a tuple for each (operator, value) keyword
    argument.

    If arguments are passed positionally, yields tuples of results.

    Examples:
        >>> seq = [1, 2, 3, 4, 5]
        >>> for acc in sk.scan_together(seq, sum=(X + Y, 0), prod=(X * Y, 1)):
        ...     print(acc)
        {'sum': 0, 'prod': 1}
        {'sum': 1, 'prod': 1}
        {'sum': 3, 'prod': 2}
        {'sum': 6, 'prod': 6}
        {'sum': 10, 'prod': 24}
        {'sum': 15, 'prod': 120}
    """
    if args and kwargs:
        raise ValueError(
            "cannot define positional and keyword arguments simultaneously"
        )
    elif args:
        return Iter(_scan_together_args(seq, args))
    elif kwargs:
        return Iter(_scan_together_kwargs(seq, kwargs))


def _scan_together_kwargs(seq, kwargs):
    data = [(k, f, [x0]) for k, (f, x0) in kwargs.items()]
    yield {k: x0[0] for k, f, x0 in data}

    for x in seq:
        item = {}
        for k, f, box in data:
            box[0] = v = f(box[0], x)
            item[k] = v
        yield item


def _scan_together_args(seq, args):
    fs = [fn for fn, _ in args]
    yield (ys := tuple(acc for _, acc in args))
    for x in seq:
        yield (ys := tuple(f(y, x) for (f, y) in zip(fs, ys)))


@fn.curry(1)
def acc_together(seq: Seq, **kwargs) -> Iter[dict]:
    """
    Similar to fold_together, but only works on non-empty sequences.

    Initial value is taken to be the first element in sequence.

    Examples:
        >>> seq = [1, 2, 3, 4, 5]
        >>> for acc in sk.acc_together(seq, sum=(X + Y), prod=(X * Y)):
        ...     print(acc)
        {'sum': 1, 'prod': 1}
        {'sum': 3, 'prod': 2}
        {'sum': 6, 'prod': 6}
        {'sum': 10, 'prod': 24}
        {'sum': 15, 'prod': 120}
    """
    x, seq = uncons(seq)
    kwargs = {k: (fn, x) for k, fn in kwargs.items()}
    return scan_together(seq, **kwargs)


#
# Special reductions
#
@fn
def product(seq: Seq, init=1):
    """
    Multiply all elements of sequence.

    Examples:
        >>> sk.product([1, 2, 3, 4, 5])
        120

    See Also:
        :func:`products`
        :func:`sum`
    """
    for x in seq:
        init = x * init
    return init


@fn
def sum(seq: Seq, init=0):
    """
    Sum all arguments of sequence.

    It exists only in symmetry with :func:`product`, since Python already
    has a builtin sum function that behaves identically.

    Examples:
        >>> sk.sum([1, 2, 3, 4, 5])
        15

    See Also:
        :func:`sums`
        :func:`product`
    """
    return _sum(seq, init)


@fn.curry(1)
@generator
def products(seq: Seq, *, init=1):
    """
    Return a sequence of partial products.

    Examples:
        >>> sk.products([1, 2, 3, 4, 5])
        sk.iter([1, 2, 6, 24, 120])

    See Also:
        :func:`acc`
        :func:`sums`
    """
    for x in seq:
        init = x * init
        yield init


@fn
@generator
def sums(seq: Seq, *, init=0):
    """
    Return a sequence of partial sums.

    Same as ``sk.fold((X + Y), seq, 0)``

    Examples:
        >>> sk.sums([1, 2, 3, 4, 5])
        sk.iter([1, 3, 6, 10, 15])

    See Also:
        :func:`acc`
        :func:`products`
    """
    for x in seq:
        init = x + init
        yield init


@fn.curry(2)
def all_by(pred: Pred, seq: Seq) -> bool:
    """
    Return True if all elements of seq satisfy predicate.

    ``all_by(None, seq)`` is the same as the builtin ``all(seq)`` function.

    Examples:
        >>> sk.all_by((X % 2), [1, 3, 5])
        True

    See Also:
        :func:`any_by`
    """
    pred = to_callable(pred)
    return all(map(pred, seq))


@fn.curry(2)
def any_by(pred: Pred, seq: Seq) -> bool:
    """
    Return True if any elements of seq satisfy predicate.

    ``any_by(None, seq)`` is the same as the builtin ``any(seq)`` function.

    Examples:
        >>> sk.any_by(sk.is_divisible_by(2), [2, 3, 5, 7, 11, 13])
        True

    See Also:
        :func:`all_by`
    """
    pred = to_callable(pred)
    return any(map(pred, seq))


@fn.curry(2)
def top_k(k: int, seq: Seq, *, key: Func = None) -> tuple:
    """
    Find the k largest elements of a sequence.

    Examples:
        >>> sk.top_k(3, "hello world")
        ('w', 'r', 'o')
    """
    return _topk(k, seq, key)