rohanpm/more-executors

View on GitHub
more_executors/_impl/retry.py

Summary

Maintainability
C
1 day
Test Coverage
A
100%
from concurrent.futures import Executor
from threading import RLock, Thread
import logging
import weakref

try:
    from time import monotonic
except ImportError:  # pragma: no cover
    from monotonic import monotonic

from .common import _Future, MAX_TIMEOUT, copy_future_exception
from .wrap import CanCustomizeBind
from .helpers import executor_loop
from .event import get_event, is_shutdown
from .logwrap import LogWrapper
from .helpers import ShutdownHelper
from .metrics import metrics, track_future


class RetryPolicy(object):
    """Instances of this class may be supplied to :class:`RetryExecutor`
    to customize the retry behavior.

    This base class will never retry.  See :class:`ExceptionRetryPolicy` for
    a general-purpose implementation."""

    def should_retry(self, attempt, future):
        """
        Parameters:
            attempt (int): number of times the future has been attempted; starts counting at 1
            future (~concurrent.futures.Future): a completed future

        Returns:
            bool:
                True if and only if a future should be retried.
        """
        return False

    def sleep_time(self, attempt, future):
        """
        Parameters:
            attempt (int): number of times the future has been attempted; starts counting at 1
            future (~concurrent.futures.Future): a completed future

        Returns:
            float:
                The amount of time (in seconds) to delay before the next
                attempt at running a future.
        """
        return 0


class ExceptionRetryPolicy(RetryPolicy):
    """Retries on any exceptions under the given base class(es),
    up to a fixed number of attempts, with an exponential
    backoff between each attempt."""

    def __init__(self, **kwargs):
        """
        Parameters:
            max_attempts (int): maximum number of times a callable should be attempted
            exponent (float): exponent used for backoff, e.g. 2.0 will result in a delay
                              which doubles between each attempt
            sleep (float): base value for delay between attempts in seconds, e.g. 1.0
                           will delay by one second between first two attempts
            max_sleep (float): maximum delay between attempts, in seconds
            exception_base (type, list(type)): future will be retried if (and only if)
                            it raises an exception inheriting from one of these classes

        All parameters are optional; reasonable defaults apply when omitted.
        """
        self._max_attempts = kwargs.get("max_attempts", 3)
        self._exponent = kwargs.get("exponent", 2.0)
        self._sleep = kwargs.get("sleep", 1.0)
        self._max_sleep = kwargs.get("max_sleep", 120)
        self._exception_base = kwargs.get("exception_base", Exception)
        if isinstance(self._exception_base, type):
            self._exception_base = [self._exception_base]

    def should_retry(self, attempt, future):
        exception = future.exception()
        if not exception:
            return False
        if attempt >= self._max_attempts:
            return False
        for klass in self._exception_base:
            if isinstance(exception, klass):
                return True
        return False

    def sleep_time(self, attempt, future):
        return min(self._sleep * (self._exponent ** (attempt - 1)), self._max_sleep)


class RetryJob(object):
    def __init__(
        self,
        policy,
        delegate_future,
        future,
        attempt,
        when,
        fn,
        args,
        kwargs,
        old_delegate=None,
    ):
        self.policy = policy
        self.delegate_future = delegate_future
        self.future = future
        self.attempt = attempt
        self.when = when
        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.stop_retry = False
        self.old_delegate = old_delegate


class RetryFuture(_Future):
    def __init__(self, executor):
        super(RetryFuture, self).__init__()
        self.delegate_future = None
        self._executor = executor
        self.add_done_callback(self._clear_executor)

    def running(self):
        with self._me_lock:
            if self.done():
                return False
            # Note that if we are not "done", but the delegate future is "done",
            # we consider ourselves as running - this should mean the delegate
            # future completed but callbacks haven't finished yet (and in fact,
            # we might decide to retry)
            if self.delegate_future:
                return self.delegate_future.running() or self.delegate_future.done()
        return False

    def _clear_delegate(self):
        with self._me_lock:
            self.delegate_future = None

    @classmethod
    def _clear_executor(cls, future):
        with future._me_lock:
            future._executor = None

    def __terminate_via(self, method, *args, **kwargs):
        with self._me_lock:
            self._clear_delegate()
            method(*args, **kwargs)
        self._me_invoke_callbacks()

    def set_result(self, result):
        self.__terminate_via(super(RetryFuture, self).set_result, result)

    def set_exception(self, exception):
        self.__terminate_via(super(RetryFuture, self).set_exception, exception)

    def set_exception_info(self, exception, traceback):
        # For python2 compat.
        # pylint: disable=no-member
        self.__terminate_via(
            super(RetryFuture, self).set_exception_info, exception, traceback
        )

    def _me_cancel(self):
        executor = self._executor
        return executor and executor._cancel(self)


class RetryExecutor(CanCustomizeBind, Executor):
    """An executor which delegates to another executor while adding
    implicit retry behavior.

    - Callables are submitted to the delegate executor, and may be
      submitted more than once if retries are required.

    - The callables may be submitted to that executor from a different
      thread than the calling thread.

    - Cancelling is supported if the delegate executor allows it.

    - Cancelling between retries is always supported.

    - Attempting to cancel a future prevents any more retries, regardless
      of whether the cancel succeeds.

    - The returned futures from this executor are only resolved or
      failed once the callable either succeeded, or all retries
      were exhausted.  This includes activation of the done callback.

    - This executor is thread-safe.
    """

    def __init__(
        self, delegate, retry_policy=None, logger=None, name="default", **kwargs
    ):
        """
        Parameters:

            delegate (~concurrent.futures.Executor):
                executor to which callables will be submitted

            retry_policy (RetryPolicy):
                policy used to determine when futures shall be retried; if omitted,
                an :class:`ExceptionRetryPolicy` is used, supplied with keyword
                arguments to this constructor

            logger (~logging.Logger):
                a logger used for messages from this executor

            name (str):
                a name for this executor

        .. versionchanged:: 2.7.0
            Introduced ``name``.
        """
        self._log = LogWrapper(logger if logger else logging.getLogger("RetryExecutor"))
        self._delegate = delegate
        self._default_retry_policy = retry_policy or ExceptionRetryPolicy(**kwargs)
        self._jobs = []
        self._submit_event = get_event()
        self._name = name

        event = self._submit_event
        self_ref = weakref.ref(self, lambda _: event.set())
        self._submit_thread = Thread(
            name="RetryExecutor-%s" % name, target=_submit_loop, args=(self_ref,)
        )
        self._submit_thread.daemon = True

        self._shutdown = ShutdownHelper()
        self._lock = RLock()

        metrics.EXEC_INPROGRESS.labels(executor=self._name, type="retry").inc()
        metrics.EXEC_TOTAL.labels(executor=self._name, type="retry").inc()

        self._submit_thread.start()

    def shutdown(self, wait=True, **_kwargs):
        if self._shutdown():
            self._log.debug("Shutting down.")
            metrics.EXEC_INPROGRESS.labels(executor=self._name, type="retry").dec()
            self._wake_thread()
            self._delegate.shutdown(wait, **_kwargs)
            if wait:
                self._log.debug("Waiting for thread")
                self._submit_thread.join(MAX_TIMEOUT)
            self._log.debug("Shutdown complete")

    def submit(self, *args, **kwargs):  # pylint: disable=arguments-differ
        return self.submit_retry(self._default_retry_policy, *args, **kwargs)

    def submit_retry(self, retry_policy, fn, *args, **kwargs):
        """Submit a callable with a specific retry policy.

        Parameters:
            retry_policy (RetryPolicy): a policy which is used for this call only
        """
        with self._shutdown.ensure_alive():
            future = RetryFuture(self)
            track_future(future, type="retry", executor=self._name)

            job = RetryJob(retry_policy, None, future, 0, monotonic(), fn, args, kwargs)
            self._append_job(job)

            # Let the submit thread know it should wake up to check for new jobs
            self._wake_thread()

            self._log.debug("Returning future %s", future)
            return future

    def _wake_thread(self):
        self._submit_event.set()

    def _get_next_job(self):
        # Find and return the next job to be handled, if any.
        # This means a job with when < now, or with stop_retry == True,
        # or if there's none, then the job with the minimum value of when.
        min_job = None
        now = monotonic()
        for job in self._jobs:
            if job.delegate_future:
                # It's already running, skip
                continue
            if job.stop_retry:
                # We've been requested to stop retrying this job,
                # and we can handle that immediately
                return job
            if job.when <= now:
                # job is overdue, just do it
                return job
            if not min_job:
                min_job = job
            elif job.when < min_job.when:
                min_job = job
        return min_job

    def _submit_now(self, job):
        # Pop job since we'll replace it.
        # We need to hold the lock for the entire duration so that other
        # threads won't see _jobs between our removal and re-add of the job
        with job.future._me_lock:
            with self._lock:
                self._pop_job(job)

                # We need the future's lock now too, because someone could
                # call cancel after this check and before we submit.
                if job.future.done():
                    self._log.debug(
                        "future done %s - not submitting to delegate", job.future
                    )
                    return

                if job.attempt != 0:
                    metrics.RETRY_TOTAL.labels(executor=self._name).inc()

                delegate_future = self._delegate.submit(job.fn, *job.args, **job.kwargs)
                job.future.delegate_future = delegate_future

                new_job = RetryJob(
                    job.policy,
                    delegate_future,
                    job.future,
                    job.attempt + 1,
                    None,
                    job.fn,
                    job.args,
                    job.kwargs,
                )
                self._append_job(new_job)
                self._log.debug("Submitted: %s", new_job)

        delegate_future.add_done_callback(self._delegate_callback)
        self._wake_thread()

    def _pop_job(self, job):
        with self._lock:
            for idx, pending in enumerate(self._jobs):
                if pending is job:
                    metrics.RETRY_QUEUE.labels(executor=self._name).dec()
                    return self._jobs.pop(idx)

    def _append_job(self, job):
        with self._lock:
            metrics.RETRY_QUEUE.labels(executor=self._name).inc()
            self._jobs.append(job)

    def _retry(self, job, sleep_time):
        self._log.debug("Will retry: %s", job)

        with self._lock:
            self._pop_job(job)
            metrics.RETRY_DELAY.labels(executor=self._name).inc(sleep_time)
            new_job = RetryJob(
                job.policy,
                None,
                job.future,
                job.attempt,
                monotonic() + sleep_time,
                job.fn,
                job.args,
                job.kwargs,
                # The old delegate future is retained in case we need
                # to propagate its exception later
                old_delegate=job.delegate_future,
            )
            new_job.stop_retry = job.stop_retry
            self._append_job(new_job)

        self._wake_thread()

    def _cancel(self, future):
        found_job = None

        with self._lock:
            for idx, job in enumerate(self._jobs):
                if job.future is future:
                    self._log.debug("Try cancel: %s", job)

                    if not job.delegate_future:
                        self._log.debug("Successful cancel - no delegate: %s", job)
                        self._jobs.pop(idx)
                        return True

                    found_job = job

                    # Whether or not we can successfully cancel, the request to cancel
                    # means that we don't want to retry any more.
                    found_job.stop_retry = True

                    break

        # This shouldn't be possible.
        # - Future holds a lock on itself, and has checked that it's not already done
        # - The only other path for removing a job is in delegate_callback, but the
        #   job is only removed *after* set_result/set_exception which would wait
        #   for the future's lock.
        assert found_job, "Cancel called on orphan %s" % future

        self._log.debug("Try cancel delegate: %s", found_job)

        if found_job.delegate_future.cancel():
            self._log.debug("Successful cancel: %s", found_job)
            future._clear_delegate()
            # Don't remove from _jobs here,
            # the callback attached to delegate_future was expected
            # to take care of that
            return True

        self._log.debug("Could not cancel: %s", found_job)

        # Let the submit thread wake up and find that we've set stop_retry
        self._wake_thread()

        return False

    def _delegate_callback(self, delegate_future):
        assert delegate_future.done(), "BUG: callback invoked while future not done!"

        self._log.debug("Callback activated for %s", delegate_future)

        found_job = None
        for job in self._jobs[:]:
            if job.delegate_future == delegate_future:
                found_job = job
                break

        # Callbacks are only installed after a job is added, and this is
        # the only place a job with a delegate associated will be removed,
        # thus it should not be possible for a job to be missing.
        assert found_job, "BUG: no job associated with delegate %s" % delegate_future

        if delegate_future.cancelled():
            # nothing to do, retrying on cancel is not allowed
            self._log.debug("Delegate was cancelled: %s", delegate_future)
            return

        (should_retry, sleep_time) = eval_policy(found_job, self._log)

        if should_retry:
            self._retry(found_job, sleep_time)
            return

        self._log.debug("Finalizing %s", found_job)

        # OK, it won't be retried.  Resolve the future.
        copy_future(delegate_future, found_job.future)

        self._pop_job(found_job)

        self._log.debug("Finalized %s", found_job)


def copy_future(f1, f2):
    exception = f1.exception()
    if exception:
        result = None
    else:
        result = f1.result()

    # OK, it won't be retried.  Resolve the future.
    if exception:
        copy_future_exception(f1, f2)
    else:
        f2.set_result(result)


def eval_policy(job, logger):
    if job.stop_retry:
        return (False, None)

    policy = job.policy

    try:
        should_retry = policy.should_retry(job.attempt, job.delegate_future)

        if should_retry:
            sleep_time = policy.sleep_time(job.attempt, job.delegate_future)
        else:
            sleep_time = None

        return (should_retry, sleep_time)
    except Exception:
        logger.exception("Exception while evaluating retry policy %r", policy)
        return (False, None)


@executor_loop
def _submit_loop(executor_ref):
    # Runs in a separate thread continuously submitting to the delegate
    # executor until no jobs are ready, or waiting until next job is ready
    while True:
        executor = executor_ref()
        if not executor:
            break

        if executor._shutdown.is_shutdown or is_shutdown():
            break

        executor._log.debug("_submit_loop iter")

        with executor._lock:
            job = executor._get_next_job()

        if not job:
            executor._log.debug("No jobs at all. Waiting...")
            event = executor._submit_event
            del executor
            _submit_wait(event)
            continue

        if job.stop_retry:
            executor._log.debug("Discarding job due to cancel: %s", job)
            executor._pop_job(job)
            copy_future(job.old_delegate, job.future)
            continue

        now = monotonic()
        if job.when <= now:
            # Can submit immediately and check for next job
            executor._submit_now(job)
            continue

        # There is nothing to submit immediately.
        # Sleep until either:
        # - reaching the time of the nearest job, or...
        # - woken up by condvar
        delta = job.when - now
        executor._log.debug("No ready job.  Waiting: %s", delta)
        event = executor._submit_event
        del executor
        del job
        _submit_wait(event, delta)


def _submit_wait(event, timeout=None):
    event.wait(timeout)
    event.clear()