paco/pipe.py

Summary

Maintainability
A
25 mins
Test Coverage
# -*- coding: utf-8 -*-
import asyncio
import functools
from inspect import isfunction, getargspec
from .generator import consume
from .assertions import isiter

# Yielded iterable error
IterableError = TypeError('pipeline yielded a non iterable object')


class PipeOverloader(object):
    """
    Pipe operator overloader object wrapping a given fn.
    """
    def __init__(self, fn, args, kw):
        self.__fn = fn
        self.__args = args
        self.__kw = kw

    @asyncio.coroutine
    def __await_coro(self, coro):
        return (yield from self.__trigger((yield from coro)))

    @asyncio.coroutine
    def __consume_generator(self, iterable):
        return (yield from self.__trigger((yield from consume(iterable))))

    def __trigger(self, iterable):
        if not isiter(iterable):
            raise IterableError

        # Compose arguments, placing iterable as second one
        args = self.__args[:1] + (iterable,) + self.__args[1:]
        # Call wrapped function
        result = self.__fn(*args, **self.__kw)

        # Clean memoized arguments to prevent memory leaks
        self.__args = None
        self.__kw = None

        # Return actual result
        return result

    def __ror__(self, iterable):
        """
        Overloads ``|`` operator expressions.
        """
        if not iterable:
            raise IterableError

        if hasattr(iterable, '__anext__'):
            return self.__consume_generator(iterable)

        if asyncio.iscoroutine(iterable):
            return self.__await_coro(iterable)

        return self.__trigger(iterable)

    def __call__(self, *args, **kw):
        """
        Maintain callable object behaviour.
        """
        _args = self.__args + args
        _kw = self.__kw
        _kw.update(kw)

        #  Clean memoized falues
        self.__args = None
        self.__kw = None

        return self.__fn(*_args, **_kw)


def overload(fn):
    """
    Overload a given callable object to be used with ``|`` operator
    overloading.

    This is especially used for composing a pipeline of
    transformation over a single data set.

    Arguments:
        fn (function): target function to decorate.

    Raises:
        TypeError: if function or coroutine function is not provided.

    Returns:
        function: decorated function
    """
    if not isfunction(fn):
        raise TypeError('paco: fn must be a callable object')

    spec = getargspec(fn)
    args = spec.args
    if not spec.varargs and (len(args) < 2 or args[1] != 'iterable'):
        raise ValueError('paco: invalid function signature or arity')

    @functools.wraps(fn)
    def decorator(*args, **kw):
        # Check function arity
        if len(args) < 2:
            return PipeOverloader(fn, args, kw)
        # Otherwise, behave like a normal wrapper
        return fn(*args, **kw)

    return decorator