rohanpm/more-executors

View on GitHub
more_executors/_impl/futures/bool.py

Summary

Maintainability
B
6 hrs
Test Coverage
A
100%
# -*- coding: utf-8 -*-

from threading import Lock
import logging
from concurrent.futures import Future

from .base import chain_cancel, weak_callback
from ..common import copy_future_exception, try_set_result
from .check import ensure_futures
from ..logwrap import LogWrapper
from ..metrics import track_future

LOG = LogWrapper(logging.getLogger("more_executors.futures"))


class BoolOperation(object):
    def __init__(self, fs):
        self.fs = {}
        for f in fs:
            self.fs[f] = True

        self.done = False
        self.lock = Lock()
        self.out = Future()

        for f in fs:
            chain_cancel(self.out, f)
            f.add_done_callback(weak_callback(self.handle_done))

    def get_state_update(self, f):
        raise NotImplementedError()  # pragma: no cover

    def handle_done(self, f):
        set_result = False
        set_exception = False
        cancel_futures = set()

        with self.lock:
            if self.done:
                return

            del self.fs[f]

            (set_result, set_exception, cancel_futures) = self.get_state_update(f)

        if set_result:
            try_set_result(self.out, f.result())
        if set_exception:
            copy_future_exception(f, self.out)

        for to_cancel in cancel_futures:
            to_cancel.cancel()


class OrOperation(BoolOperation):
    def get_state_update(self, f):
        set_result = False
        set_exception = False
        cancel_futures = set()

        # If it's the last result or it's a successful result:
        if (not self.fs) or (not f.cancelled() and not f.exception() and f.result()):
            self.done = True
            cancel_futures = list(self.fs.keys())
            if f.cancelled():
                # Cancelled => output is cancelled
                cancel_futures.append(self.out)
            elif f.exception():
                # Failed
                set_exception = True
            else:
                # Last or successful
                set_result = True

        return (set_result, set_exception, cancel_futures)


@ensure_futures
def f_or(f, *fs):
    """Boolean ``OR`` over a number of futures.

    Signature: :code:`Future<A>[, Future<B>[, ...]] ⟶ Future<A|B|...>`

    Arguments:
        f (~concurrent.futures.Future)
            Any future
        fs (~concurrent.futures.Future)
            Any futures

    Returns:
        :class:`~concurrent.futures.Future`
            A future resolved from the inputs using ``OR`` semantics:

            - Resolved with the earliest true value returned by an input
              future, if any.
            - Otherwise, resolved with the latest false value or exception
              returned by the input futures.

    .. note::
        This function is tested with up to 100,000 input futures.
        Exceeding this limit may result in performance issues.

    .. versionadded:: 1.19.0
    """
    if not fs:
        return f

    oper = OrOperation([f] + list(fs))
    track_future(oper.out, type="or")
    return oper.out


class AndOperation(BoolOperation):
    def get_state_update(self, f):
        set_result = False
        set_exception = False
        cancel_futures = set()

        if f.cancelled():
            # Cancelled => output is cancelled
            self.done = True
            cancel_futures = list(self.fs.keys())
            cancel_futures.append(self.out)
        elif f.exception():
            # Failed => we're done
            self.done = True
            set_exception = True
            cancel_futures = list(self.fs.keys())
        elif (not f.result()) or (not self.fs):
            # Falsey result or last result => we're done
            self.done = True
            cancel_futures = list(self.fs.keys())
            set_result = True

        return (set_result, set_exception, cancel_futures)


@ensure_futures
def f_and(f, *fs):
    """Boolean ``AND`` over a number of futures.

    Signature: :code:`Future<A>[, Future<B>[, ...]] ⟶ Future<A|B|...>`

    Arguments:
        f (~concurrent.futures.Future)
            Any future
        fs (~concurrent.futures.Future)
            Any futures

    Returns:
        :class:`~concurrent.futures.Future`
            A future resolved from the inputs using ``AND`` semantics:

            - Resolved with the latest value returned by an input
              future, if all futures are resolved with true values.
            - Otherwise, resolved with the earliest false value or exception
              returned by the input futures.

    .. note::
        This function is tested with up to 100,000 input futures.
        Exceeding this limit may result in performance issues.

    .. versionadded:: 1.19.0
    """
    if not fs:
        return f

    oper = AndOperation([f] + list(fs))
    track_future(oper.out, type="and")
    return oper.out