fabiommendes/sidekick

View on GitHub
sidekick-seq/sidekick/seq/lib_combining.py

Summary

Maintainability
A
35 mins
Test Coverage
import itertools

from .iter import Iter, generator
from .lib_basic import is_empty
from .util import vargs
from .. import _toolz as toolz
from ..functions import fn
from ..typing import Seq, Union, Raisable, Callable, Func, TYPE_CHECKING, NOT_GIVEN

if TYPE_CHECKING:
    from .. import api as sk, to_callable  # noqa: F401


class UnalignedZipError(Exception):
    """
    Raised by zip_aligned when two iterators do not have the same size.
    """

    @classmethod
    def default(cls):
        """
        Create error with default message.
        """
        return cls("sequences are not aligned")


@fn
def concat(seqs: Union[Seq, Seq[Seq]], *extra) -> Iter:
    """
    Concatenates all iterators in sequence of iterators.

    If called with a single argument, it must be a a sequence of sequences.
    Otherwise, each argument is treated like a input sequence.

    Examples:
        >>> sk.concat([1, 2, 3], [4, 5, 6])
        sk.iter([1, 2, 3, 4, 5, 6])

        It can concatenate (possibly infinite iterators)
        >>> sk.concat(map(range, sk.nums(1, ...)))
        sk.iter([0, 0, 1, 0, 1, 2, ...])
    """
    if extra:
        return Iter(itertools.chain(seqs, *extra))
    return Iter(itertools.chain.from_iterable(seqs))


@fn.curry(1)
def interleave(seqs: Union[Seq, Seq[Seq]], *extra, aligned=False):
    """
    Interleave sequence of sequences.

    Args:
        seqs:
            a sequence of sequences
        aligned:
            If True, stops when the first iterator exausts. Aligned does not
            work with an infinite sequence of sequences, but all sequences in
            this finite list may be inifinite.

    Examples:
        >>> sk.interleave([1, 2, 3], 'abc')
        sk.iter([1, 'a', 2, 'b', 3, 'c'])

        If called with a single argument, it must be a sequence of sequences.

        >>> sk.interleave([[1, 2, 3], 'abc'])
        sk.iter([1, 'a', 2, 'b', 3, 'c'])
    """
    if extra:
        seqs = (seqs, *extra)
    if aligned:
        return Iter(_interleave_aligned(seqs))
    return Iter(toolz.interleave(seqs))


def _interleave_aligned(seqs):
    nexts = [iter(it).__next__ for it in seqs]
    buffer = []
    fill = buffer.extend

    while True:
        buffer.clear()
        try:
            fill(f() for f in nexts)
            yield from buffer
        except StopIteration:
            break


@generator
def zip_aligned(
    *args: Seq, error: Callable[[], Raisable] = UnalignedZipError.default
) -> Iter:
    """
    Zip and raises an error if sizes do not match.

    Similar to the zip built-in, but raises an UnalignedZipError if one sequence
    terminates before the others.

    Args:
        args:
            Input sequences, passed as positional arguments.
        error:
            If given, changes the error raised when sequences are not aligned.

    Examples:
        If sizes match, it is just like zip.

        >>> sk.zip_aligned((1, 2, 3), (4, 5, 6))
        sk.iter([(1, 4), (2, 5), (3, 6)])

        But gives an error otherwise.

        >>> sk.zip_aligned((1, 2, 3), (4, 5, 6, 7)).last()
        Traceback (most recent call last):
        ...
        UnalignedZipError: sequences are not aligned
    """
    args = tuple(map(iter, args))
    yield from zip(*args)
    if not all(map(is_empty, args)):
        raise error()


@fn
def diff(*seqs: Seq, key: Func = None, default=NOT_GIVEN) -> Iter:
    """
    Return those items that differ in a pairwise comparison between sequences.

    Examples:
        >>> sk.diff([1, 2, 3, 4], [1, 2, 4, 8])
        sk.iter([(3, 4), (4, 8)])
    """
    kwargs = {}
    if key is not None:
        kwargs["key"] = to_callable(key)
    if default is not NOT_GIVEN:
        kwargs["default"] = default
    return Iter(toolz.diff(*vargs(seqs), **kwargs))


@fn.curry(4)
def join(leftkey: Func, leftseq: Seq, rightkey: Func, rightseq: Seq, **kwargs) -> Iter:
    """
    Join two sequences on common attributes.
    """

    leftkey = to_callable(leftkey)
    rightkey = to_callable(rightkey)
    return Iter(toolz.join(leftkey, leftseq, rightkey, rightseq, **kwargs))


@fn
def merge_sorted(*seqs, key=None) -> Iter:
    """
    Merge and sort a collection of sorted collections.

    Examples:
        >>> sk.merge_sorted(sk.nums(1, 3, ..., 9), sk.nums(0, 2, ..., 10))
        sk.iter([0, 1, 2, 3, 4, 5, ...])
    """
    seqs = seqs[0] if len(seqs) == 1 else seqs
    key = key and to_callable(key)
    return Iter(toolz.merge_sorted(*seqs, key=key))