paco/every.py

Summary

Maintainability
A
25 mins
Test Coverage
# -*- coding: utf-8 -*-
import asyncio
from .partial import partial
from .decorator import overload
from .concurrent import ConcurrentExecutor
from .assertions import assert_corofunction, assert_iter


@overload
@asyncio.coroutine
def every(coro, iterable, limit=1, loop=None):
    """
    Returns `True` if every element in a given iterable satisfies the coroutine
    asynchronous test.

    If any iteratee coroutine call returns `False`, the process is inmediately
    stopped, and `False` will be returned.

    You can increase the concurrency limit for a fast race condition scenario.

    This function is a coroutine.

    This function can be composed in a pipeline chain with ``|`` operator.

    Arguments:
        coro (coroutine function): coroutine function to call with values
            to reduce.
        iterable (iterable): an iterable collection yielding
            coroutines functions.
        limit (int): max concurrency execution limit. Use ``0`` for no limit.
        loop (asyncio.BaseEventLoop): optional event loop to use.

    Raises:
        TypeError: if input arguments are not valid.

    Returns:
        bool: `True` if all the values passes the test, otherwise `False`.

    Usage::

        async def gt_10(num):
            return num > 10

        await paco.every(gt_10, [1, 2, 3, 11])
        # => False

        await paco.every(gt_10, [11, 12, 13])
        # => True

    """
    assert_corofunction(coro=coro)
    assert_iter(iterable=iterable)

    # Reduced accumulator value
    passes = True

    # Handle empty iterables
    if len(iterable) == 0:
        return passes

    # Create concurrent executor
    pool = ConcurrentExecutor(limit=limit, loop=loop)

    # Tester function to guarantee the file is canceled.
    @asyncio.coroutine
    def tester(element):
        nonlocal passes
        if not passes:
            return None

        if not (yield from coro(element)):
            # Flag as not test passed
            passes = False
            # Force ignoring pending coroutines
            pool.cancel()

    # Iterate and attach coroutine for defer scheduling
    for element in iterable:
        pool.add(partial(tester, element))

    # Wait until all coroutines finish
    yield from pool.run()

    return passes