sidekick-seq/sidekick/seq/lib_reducers.py
import warnings
from functools import reduce as _reduce
from .iter import Iter, generator
from .lib_basic import uncons
from .._toolz import accumulate as _accumulate, topk as _topk, reduceby
from ..functions import fn, to_callable
from ..typing import Func, Seq, Pred, TYPE_CHECKING, NOT_GIVEN
if TYPE_CHECKING:
from .. import api as sk # noqa: F401
from ..api import X, Y # noqa: F401
_sum = sum
#
# Basic reductions and folds
#
@fn.curry(3)
def fold(func: Func, init, seq: Seq):
"""
Perform a left reduction of sequence.
Examples:
>>> sk.fold(op.add, 0, [1, 2, 3, 4])
10
See Also:
:func:`reduce`
:func:`scan`
"""
func = to_callable(func)
return _reduce(func, seq, init)
@fn.curry(2)
def reduce(func: Func, seq: Seq, initial=NOT_GIVEN):
"""
Like fold, but does not require initial value.
This function raises a ValueError on empty sequences.
Examples:
>>> sk.reduce(op.add, [1, 2, 3, 4])
10
See Also:
:func:`fold`
:func:`acc`
"""
if initial is not NOT_GIVEN:
warnings.warn("use the sk.fold() function to set initial values.")
return fold(func, initial, seq)
func = to_callable(func)
return _reduce(func, seq)
@fn.curry(2)
def acc(func: Func, seq: Seq) -> Iter:
"""
Like :func:`scan`, but uses first item of sequence as initial value.
See Also:
:func:`scan`
:func:`reduce`
"""
func = to_callable(func)
return Iter(_accumulate(func, seq))
@fn.curry(3)
def scan(func: Func, init, seq: Seq) -> Iter:
"""
Returns a sequence of the intermediate folds of seq by func.
In other words it generates a sequence like:
func(init, seq[0]), func(result[0], seq[1]), ...
in which result[i] corresponds to items in the resulting sequence.
See Also:
:func:`acc`
:func:`fold`
"""
func = to_callable(func)
return Iter(_accumulate(func, seq, init))
@fn.curry(4)
def fold_by(key: Func, op, init, seq: Seq) -> dict:
"""
Reduce each sequence generated by a group by.
More efficient than performing separate operations since it does not
store intermediate groups.
Examples:
>>> sk.fold_by(X % 2, op.add, 0, [1, 2, 3, 4, 5])
{1: 9, 0: 6}
See Also:
:func:`fold`
:func:`group_by`
:func:`reduce_by`
"""
return reduceby(key, op, seq, init)
@fn.curry(3)
def reduce_by(key: Func, op, seq: Seq) -> dict:
"""
Similar to fold_by, but only works on non-empty sequences.
Initial value is taken to be the first element in sequence.
Examples:
>>> sk.reduce_by(X % 2, op.add, [1, 2, 3, 4, 5])
{1: 9, 0: 6}
See Also:
:func:`fold`
:func:`group_by`
:func:`reduce_by`
"""
return reduceby(key, op, seq)
@fn.curry(1)
def fold_together(seq: Seq, **kwargs) -> dict:
"""
Folds using multiple functions simultaneously.
Examples:
>>> seq = [1, 2, 3, 4, 5]
>>> sk.fold_together(seq, sum=((X + Y), 0), prod=((X * Y), 1))
{'sum': 15, 'prod': 120}
"""
data = [[func, v] for (func, v) in kwargs.values()]
for x in seq:
for pair in data:
func, v = pair
pair[1] = func(v, x)
return {k: v for k, (_, v) in zip(kwargs, data)}
@fn.curry(1)
def reduce_together(seq: Seq, **kwargs):
"""
Similar to fold_together, but only works on non-empty sequences.
Initial value is taken to be the first element in sequence.
Examples:
>>> seq = [1, 2, 3, 4, 5]
>>> sk.reduce_together(seq, sum=(X + Y), prod=(X * Y), max=max, min=min)
{'sum': 15, 'prod': 120, 'max': 5, 'min': 1}
"""
x, seq = uncons(seq)
kwargs = {k: (fn, x) for k, fn in kwargs.items()}
return fold_together(seq, **kwargs)
@fn.curry(1)
def scan_together(seq: Seq, *args, **kwargs) -> Iter[dict]:
"""
Folds using multiple functions simultaneously.
Initial value is passed as a tuple for each (operator, value) keyword
argument.
If arguments are passed positionally, yields tuples of results.
Examples:
>>> seq = [1, 2, 3, 4, 5]
>>> for acc in sk.scan_together(seq, sum=(X + Y, 0), prod=(X * Y, 1)):
... print(acc)
{'sum': 0, 'prod': 1}
{'sum': 1, 'prod': 1}
{'sum': 3, 'prod': 2}
{'sum': 6, 'prod': 6}
{'sum': 10, 'prod': 24}
{'sum': 15, 'prod': 120}
"""
if args and kwargs:
raise ValueError(
"cannot define positional and keyword arguments simultaneously"
)
elif args:
return Iter(_scan_together_args(seq, args))
elif kwargs:
return Iter(_scan_together_kwargs(seq, kwargs))
def _scan_together_kwargs(seq, kwargs):
data = [(k, f, [x0]) for k, (f, x0) in kwargs.items()]
yield {k: x0[0] for k, f, x0 in data}
for x in seq:
item = {}
for k, f, box in data:
box[0] = v = f(box[0], x)
item[k] = v
yield item
def _scan_together_args(seq, args):
fs = [fn for fn, _ in args]
yield (ys := tuple(acc for _, acc in args))
for x in seq:
yield (ys := tuple(f(y, x) for (f, y) in zip(fs, ys)))
@fn.curry(1)
def acc_together(seq: Seq, **kwargs) -> Iter[dict]:
"""
Similar to fold_together, but only works on non-empty sequences.
Initial value is taken to be the first element in sequence.
Examples:
>>> seq = [1, 2, 3, 4, 5]
>>> for acc in sk.acc_together(seq, sum=(X + Y), prod=(X * Y)):
... print(acc)
{'sum': 1, 'prod': 1}
{'sum': 3, 'prod': 2}
{'sum': 6, 'prod': 6}
{'sum': 10, 'prod': 24}
{'sum': 15, 'prod': 120}
"""
x, seq = uncons(seq)
kwargs = {k: (fn, x) for k, fn in kwargs.items()}
return scan_together(seq, **kwargs)
#
# Special reductions
#
@fn
def product(seq: Seq, init=1):
"""
Multiply all elements of sequence.
Examples:
>>> sk.product([1, 2, 3, 4, 5])
120
See Also:
:func:`products`
:func:`sum`
"""
for x in seq:
init = x * init
return init
@fn
def sum(seq: Seq, init=0):
"""
Sum all arguments of sequence.
It exists only in symmetry with :func:`product`, since Python already
has a builtin sum function that behaves identically.
Examples:
>>> sk.sum([1, 2, 3, 4, 5])
15
See Also:
:func:`sums`
:func:`product`
"""
return _sum(seq, init)
@fn.curry(1)
@generator
def products(seq: Seq, *, init=1):
"""
Return a sequence of partial products.
Examples:
>>> sk.products([1, 2, 3, 4, 5])
sk.iter([1, 2, 6, 24, 120])
See Also:
:func:`acc`
:func:`sums`
"""
for x in seq:
init = x * init
yield init
@fn
@generator
def sums(seq: Seq, *, init=0):
"""
Return a sequence of partial sums.
Same as ``sk.fold((X + Y), seq, 0)``
Examples:
>>> sk.sums([1, 2, 3, 4, 5])
sk.iter([1, 3, 6, 10, 15])
See Also:
:func:`acc`
:func:`products`
"""
for x in seq:
init = x + init
yield init
@fn.curry(2)
def all_by(pred: Pred, seq: Seq) -> bool:
"""
Return True if all elements of seq satisfy predicate.
``all_by(None, seq)`` is the same as the builtin ``all(seq)`` function.
Examples:
>>> sk.all_by((X % 2), [1, 3, 5])
True
See Also:
:func:`any_by`
"""
pred = to_callable(pred)
return all(map(pred, seq))
@fn.curry(2)
def any_by(pred: Pred, seq: Seq) -> bool:
"""
Return True if any elements of seq satisfy predicate.
``any_by(None, seq)`` is the same as the builtin ``any(seq)`` function.
Examples:
>>> sk.any_by(sk.is_divisible_by(2), [2, 3, 5, 7, 11, 13])
True
See Also:
:func:`all_by`
"""
pred = to_callable(pred)
return any(map(pred, seq))
@fn.curry(2)
def top_k(k: int, seq: Seq, *, key: Func = None) -> tuple:
"""
Find the k largest elements of a sequence.
Examples:
>>> sk.top_k(3, "hello world")
('w', 'r', 'o')
"""
return _topk(k, seq, key)