bitranox/wrapt_timeout_decorator

View on GitHub
wrapt_timeout_decorator/wrap_function_multiprocess.py

Summary

Maintainability
A
2 hrs
Test Coverage
C
79%
# STDLIB
from multiprocessing import ProcessError
import signal
import sys
import time
from typing import Any, Optional

# EXT
import multiprocess  # type: ignore

# OWN
try:
    from .wrap_helper import WrapHelper, raise_exception
except ImportError:  # pragma: no cover
    # Import for local DocTest
    from wrap_helper import WrapHelper, raise_exception  # type: ignore # pragma: no cover


class Timeout(object):
    """Wrap a function and add a timeout (limit) attribute to it.
    Instances of this class are automatically generated by the add_timeout
    function defined above. Wrapping a function allows asynchronous calls
    to be made and termination of execution after a timeout has passed.
    """

    def __init__(self, wrap_helper: WrapHelper) -> None:
        """Initialize instance in preparation for being called."""
        self.wrap_helper = wrap_helper
        self.__name__ = self.wrap_helper.wrapped.__name__
        self.__doc__ = self.wrap_helper.wrapped.__doc__
        self.__process = None  # type: multiprocess.Process
        self.__parent_conn = None  # type: multiprocess.Pipe
        self.__subprocess_start_time: float = 0.0
        self.__subprocess_max_end_time: float = 0.0
        self.__sleeping_time: float = 0.0
        self.__reset_signals = wrap_helper.dec_mp_reset_signals

    def __call__(self) -> Any:
        """Execute the embedded function object asynchronously.
        The function given to the constructor is transparently called and
        requires that "ready" be intermittently polled. If and when it is
        True, the "value" property may then be checked for returned data.
        """
        self.__parent_conn, self.wrap_helper.child_conn = multiprocess.Pipe(duplex=False)
        self.__process = multiprocess.Process(target=_target, args=[self.wrap_helper])
        # daemonic process must not have subprocess - we need that for nested decorators
        self.__process.daemon = False
        self.__process.start()
        if not self.wrap_helper.dec_hard_timeout:
            self.wait_until_process_started()

        if self.wrap_helper.dec_poll_subprocess:
            # here we poll for the result and check if the subprocess is still alive
            # long living subprocesses might get killed by oop killer and some users want to
            # return immediately if that happens
            self.__subprocess_start_time = time.time()
            self.__subprocess_max_end_time = self.__subprocess_start_time + self.wrap_helper.dec_timeout_float
            self.adjust_sleeping_time()

            while True:
                if self.is_result_ready_on_the_pipe_with_timeout(timeout_seconds=self.__sleeping_time):
                    return self.value
                if not self.__process.is_alive():
                    self.cancel_subprocess_was_killed()
                self.adjust_sleeping_time()
                if self.time_is_over():
                    self.cancel()

        else:
            # here we just wait for the result and dont poll if the subprocess is actually alive
            if self.is_result_ready_on_the_pipe_with_timeout(timeout_seconds=self.wrap_helper.dec_timeout_float):
                return self.value
            else:
                self.cancel()

    def is_result_ready_on_the_pipe(self) -> bool:
        """ check if there is a result on the pipe, with timeout
        see also : https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection

        If timeout_seconds is not specified then it will return immediately.
        If timeout is a number then this specifies the maximum time in seconds to block.
        If timeout is None then an infinite timeout is used.
        """
        return bool(self.__parent_conn.poll())

    def is_result_ready_on_the_pipe_with_timeout(self, timeout_seconds: Optional[float]) -> bool:
        """ check if there is a result on the pipe, with timeout
        see also : https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection

        If timeout_seconds is not specified then it will return immediately.
        If timeout is a number then this specifies the maximum time in seconds to block.
        If timeout is None then an infinite timeout is used.
        """
        return bool(self.__parent_conn.poll(timeout_seconds))

    def time_is_over(self) -> bool:
        """ returns True if the Time is over """
        return time.time() >= self.__subprocess_max_end_time

    def adjust_sleeping_time(self) -> None:
        """ adjust sleeping time, not to sleep longer as the timeout allows """
        if time.time() + self.wrap_helper.dec_poll_subprocess > self.__subprocess_max_end_time:
            self.__sleeping_time = self.__subprocess_max_end_time - time.time()
        else:
            self.__sleeping_time = self.wrap_helper.dec_poll_subprocess

        if self.__sleeping_time < 0:
            self.__sleeping_time = 0

    def cancel(self) -> None:
        """Terminate any possible execution of the embedded function."""
        if self.__process.is_alive():  # pragma: no cover      # we can not produce that state - its just a security measure
            self.__process.terminate()
        self.__process.join(timeout=1.0)
        self.__parent_conn.close()
        raise_exception(self.wrap_helper.timeout_exception, self.wrap_helper.exception_message)

    def cancel_subprocess_was_killed(self) -> None:
        self.__process.join(timeout=1.0)
        self.__parent_conn.close()
        subprocess_run_time = time.time() - self.__subprocess_start_time
        self.wrap_helper.format_subprocess_exception_message(subprocess_run_time=subprocess_run_time)
        raise_exception(ProcessError, self.wrap_helper.exception_message)

    def wait_until_process_started(self) -> None:
        self.__parent_conn.recv()

    @property
    def value(self) -> Any:
        exception_occured, result = self.__parent_conn.recv()
        # when self.__parent_conn.recv() exits, maybe __process is still alive,
        # then it might zombie the process. so join it explicitly
        self.__process.join(timeout=1.0)
        self.__parent_conn.close()

        if exception_occured:
            raise result
        else:
            return result


def _target(wrap_helper: WrapHelper) -> None:
    """Run a function with arguments and return output via a pipe.
    This is a helper function for the Process created in Timeout. It runs
    the function with positional arguments and keyword arguments and then
    returns the function's output by way of a queue. If an exception is
    raised, it is returned to Timeout to be raised by the value property.
    """
    # noinspection PyBroadException
    try:
        if multiprocess.get_start_method() == 'fork' and wrap_helper.dec_mp_reset_signals:
            _mp_reset_signals()
        if not wrap_helper.dec_hard_timeout:
            wrap_helper.child_conn.send("started")
        exception_occured = False
        wrap_helper.child_conn.send((exception_occured, wrap_helper.wrapped(*wrap_helper.args, **wrap_helper.kwargs)))
    except Exception:
        exception_occured = True
        wrap_helper.child_conn.send((exception_occured, sys.exc_info()[1]))

    finally:
        wrap_helper.child_conn.close()


def _mp_reset_signals() -> None:
    """ if the process is using wakeup_fd, the child process will inherit the file descriptor
    when we are sending a signal to the child, it goes to this opened socket.
    but the parent process listens also to this socket, so it receives a signal to terminate and shut down the application.
    therefore we need to return the default behavior of signal handlers for child process and don't use the inherited fd from the parent;
    """
    signal.set_wakeup_fd(-1)
    signal.signal(signal.SIGTERM, signal.SIG_DFL)
    signal.signal(signal.SIGINT, signal.SIG_DFL)