sidekick-seq/sidekick/seq/lib_grouping.py
from itertools import tee, chain, takewhile, dropwhile, islice, count
from toolz import groupby
from .iter import Iter, generator
from .lib_basic import uncons
from .._toolz import partition_all, partition as _partition, sliding_window, partitionby
from ..functions import fn, to_callable
from ..typing import (
Seq,
NOT_GIVEN,
TYPE_CHECKING,
Union,
Pred,
Func,
Tuple,
Iterable,
T,
Literal,
)
_next = next
if TYPE_CHECKING:
from .. import api as sk # noqa: F401
from ..api import X, Y # noqa: F401
ChunksHow = Literal["values", "pairs", "left", "right", "drop"]
@fn.curry(2)
def group_by(key: Func, seq: Seq) -> dict:
"""
Group collection by the results of a key function.
Examples:
>>> sk.group_by((X % 2), range(5))
{0: [0, 2, 4], 1: [1, 3]}
See Also:
:func:`reduce_by`
:func:`fold_by`
"""
return groupby(key, seq)
@fn.curry(2)
def chunks(
n: Union[int, Iterable[int]], seq: Seq[T], *, pad=NOT_GIVEN, drop: bool = False
) -> Iter[Tuple[T, ...]]:
"""
Partition sequence into non-overlapping tuples of length n.
Args:
n:
Number of elements in each partition. If n is a sequence, it selects
partitions by the sequence size.
seq:
Input sequence.
pad:
If given, pad a trailing incomplete partition with this value until
it has n elements.
drop:
If True, drop the last partition if it has less than n elements.
Examples:
Too see the difference between padding, dropping and the regular
behavior.
>>> sk.chunks(2, range(5))
sk.iter([(0, 1), (2, 3), (4,)])
>>> sk.chunks(2, range(5), pad=None)
sk.iter([(0, 1), (2, 3), (4, None)])
>>> sk.chunks(2, range(5), drop=True)
sk.iter([(0, 1), (2, 3)])
Using sequences, we can create more complicated chunking patterns.
>>> sk.chunks(sk.cycle([2, 3]), range(10))
sk.iter([(0, 1), (2, 3, 4), (5, 6), (7, 8, 9)])
A trailing ellipsis consumes the rest of the iterator.
>>> sk.chunks([1, 2, 3, ...], range(10))
sk.iter([(0,), (1, 2), (3, 4, 5), (6, 7, 8, 9)])
See Also:
:func:`chunks_by`
:func:`window`
"""
if isinstance(n, int):
if drop:
return Iter(_partition(n, seq))
elif pad is not NOT_GIVEN:
return Iter(_partition(n, seq, pad))
else:
return Iter(partition_all(n, seq))
elif drop:
return Iter(_chunks_sizes_ex(n, seq, True, None))
elif pad:
return Iter(_chunks_sizes_ex(n, seq, False, pad))
else:
return Iter(_chunks_sizes(n, seq))
def _chunks_sizes(ns, seq):
for n in ns:
if n is ...:
yield tuple(seq)
break
else:
yield tuple(islice(seq, n))
def _chunks_sizes_ex(ns, seq, drop, pad):
buf = []
clear = buf.clear
fill = buf.extend
seq = iter(seq)
for n in ns:
if n is ...:
yield tuple(seq)
break
else:
clear()
try:
fill(next(seq) for _ in range(n))
yield tuple(buf)
except (RuntimeError, StopIteration):
if drop or not buf:
return
m = n - len(buf)
fill([pad] * m)
yield tuple(buf)
break
@fn.curry(2)
def chunks_by(func: Func, seq: Seq, how: ChunksHow = "values") -> Iter:
"""
Partition sequence into chunks according to a function.
It creates a new partition every time the value of func(item) changes.
Args:
func:
Function used to control partition creation
seq:
Input sequence.
how:
Control how func is used to create new chunks from iterator.
* 'values' (default): create a new chunk when func(x) changes value
* 'pairs': create new chunk when func(x, y) for two successive values
is True.
* 'left': create new chunk when func(x) is True. x is put in the
chunk to the left.
* 'right': create new chunk when func(x) is True. x is put in the
chunk to the right.
* 'drop': create new chunk when func(x) is True. x dropped from output
sequence. It behaves similarly to str.split.
Examples:
Standard chunker
>>> sk.chunks_by((X // 3), range(10))
sk.iter([(0, 1, 2), (3, 4, 5), (6, 7, 8), (9,)])
Chunk by pairs
>>> sk.chunks_by((Y <= X), [1, 2, 3, 2, 4, 8, 0, 1], how='pairs') # noqa
sk.iter([(1, 2, 3), (2, 4, 8), (0, 1)])
Chunk by predicate. The different versions simply define in which chunk
the split location will be allocated
>>> sk.chunks_by(sk.is_odd, [1, 2, 3, 2, 4, 8], how='left')
sk.iter([(1,), (2, 3), (2, 4, 8)])
>>> sk.chunks_by(sk.is_odd, [1, 2, 3, 2, 4, 8], how='right')
sk.iter([(1, 2), (3, 2, 4, 8)])
>>> sk.chunks_by(sk.is_odd, [1, 2, 3, 2, 4, 8], how='drop')
sk.iter([(), (2,), (2, 4, 8)])
See Also:
:func:`chunks`
:func:`partition`
"""
if how == "values":
return Iter(partitionby(to_callable(func), seq))
elif how == "pairs":
return Iter(_chunks_pairs(func, seq))
elif how == "left":
return Iter(_chunks_left(func, seq))
elif how == "right":
return Iter(_chunks_right(func, seq))
elif how in ("drop", "split"):
return Iter(_chunks_split(func, seq))
else:
raise ValueError(f"invalid method: {how!r}")
def _chunks_pairs(pred, seq):
try:
x, it = uncons(seq)
except StopIteration:
return
buf = [x]
add = buf.append
clear = buf.clear
for y in it:
if pred(x, y):
yield tuple(buf)
clear()
add(y)
x = y
yield tuple(buf)
def _chunks_right(pred, seq):
buf = []
add = buf.append
clear = buf.clear
for x in seq:
if pred(x) and buf:
yield tuple(buf)
clear()
add(x)
yield tuple(buf)
def _chunks_left(pred, seq):
buf = []
add = buf.append
clear = buf.clear
for x in seq:
add(x)
if pred(x) and buf:
yield tuple(buf)
clear()
if buf:
yield tuple(buf)
def _chunks_split(pred, seq):
buf = []
add = buf.append
clear = buf.clear
for x in seq:
if pred(x):
yield tuple(buf)
clear()
else:
add(x)
yield tuple(buf)
@fn.curry(2)
def window(n: int, seq: Seq) -> Iter:
"""
Return a sequence of overlapping sub-sequences of size n.
``n == 2`` is equivalent to a pairwise iteration.
Examples:
Pairwise iteration:
>>> [''.join(p) for p in sk.window(2, "hello!")] # noqa
['he', 'el', 'll', 'lo', 'o!']
See Also:
:func:`pairs`
"""
return sliding_window(n, seq)
@fn.curry(1)
def pairs(seq: Seq, *, prev=NOT_GIVEN, next=NOT_GIVEN) -> Iter:
"""
Returns an iterator of a pair adjacent items.
This is similar to window(2), but requires a fill value specified either
with ``prev`` or ``next`` to select if it will form pairs with the preceeding
of the following value.
It must specify either prev or next, never both.
Args:
seq:
Input sequence.
prev:
If given, fill this value in the first item and iterate over all
items preceded with the previous value.
next:
If given, fill this value in the last item and iterate over all
items followed with the next value.
Examples:
>>> [''.join(p) for p in sk.pairs("hello!", prev="-")] # noqa
['-h', 'he', 'el', 'll', 'lo', 'o!']
>>> [''.join(p) for p in sk.pairs("hello!", next="!")] # noqa
['he', 'el', 'll', 'lo', 'o!', '!!']
See Also:
:func:`window`
"""
if prev is NOT_GIVEN and next is NOT_GIVEN:
raise TypeError("must specify either prev or next keyword arguments")
elif prev is NOT_GIVEN:
a, b = tee(seq)
_next(b, None)
return Iter(zip(a, chain(b, [next])))
elif next is NOT_GIVEN:
return Iter(_pairs_prev(seq, prev))
else:
raise TypeError("must specify either prev or next keyword arguments, not both")
def _pairs_prev(seq, prev):
seq = iter(seq)
for x in seq:
yield (prev, x)
prev = x
@fn.curry(2)
def partition(
key: Union[int, Pred], seq: Seq, sep: bool = False
) -> Union[Tuple[Seq, Seq], Tuple[list, list, Seq]]:
"""
Partition sequence in two.
Returns a sequence with elements before and after the key separator.
Args:
key:
An integer index or predicate used to partition sequence.
seq:
Input sequence.
sep:
If True, split into 3 parts: head, sep, tail, analogously to the
builtin str.partition function. The first two elements are lists
and can be easily tested for emptiness.
Examples:
>>> a, b = sk.partition(2, [5, 4, 3, 2, 1])
>>> a
sk.iter([5, 4])
>>> b
sk.iter([3, 2, 1])
>>> a, b = sk.partition((X == 3), [1, 2, 3, 4, 5])
>>> a
sk.iter([1, 2])
>>> b
sk.iter([3, 4, 5])
>>> sk.partition((X == 3), [1, 2, 3, 4, 5], sep=True)
([1, 2], [3], sk.iter([4, 5]))
"""
if sep:
seq = iter(seq)
first = []
sep: list = []
if isinstance(key, int):
for i, x in zip(range(key + 1), seq):
first.append(x)
if len(first) == key + 1:
sep = [first.pop()]
else:
for x in seq:
if key(x):
sep.append(x)
break
else:
first.append(x)
return first, sep, Iter(seq)
if isinstance(key, int):
a, b = tee(seq)
return Iter(islice(a, key)), Iter(islice(b, key, None))
else:
pred = to_callable(key)
a, b = tee((pred(x), x) for x in seq)
value = lambda x: x[1]
return (
Iter(map(value, takewhile(lambda x: not x[0], a))),
Iter(map(value, dropwhile(lambda x: not x[0], b))),
)
@fn
def distribute(n: int, seq: Seq) -> Tuple[Seq, ...]:
"""
Distribute items of seq into n different sequences.
Args:
n:
Number of output sequences.
seq:
Input sequences.
Examples:
>>> a, b = sk.distribute(2, [0, 1, 2, 3, 4, 5, 6])
>>> a
sk.iter([0, 2, 4, 6])
>>> b
sk.iter([1, 3, 5])
"""
results = tee(seq, n)
return tuple(Iter(islice(it, i, None, n)) for i, it in enumerate(results))
@generator
def inits(seq: Seq) -> Seq:
"""
Lazily return all sub-sequences at beginning of seq.
Examples:
>>> [''.join(sub) for sub in inits('abc')] # noqa
['', 'a', 'ab', 'abc']
"""
seq, consume = tee(seq)
i = 0
for i, x in enumerate(consume):
seq, consume = tee(seq)
yield islice(consume, i)
seq, consume = tee(seq)
yield islice(consume, i + 1)