wrapt_timeout_decorator/wrap_function_multiprocess.py
# STDLIB
from multiprocessing import ProcessError
import signal
import sys
import time
from typing import Any, Optional
# EXT
import multiprocess # type: ignore
# OWN
try:
from .wrap_helper import WrapHelper, raise_exception
except ImportError: # pragma: no cover
# Import for local DocTest
from wrap_helper import WrapHelper, raise_exception # type: ignore # pragma: no cover
class Timeout(object):
"""Wrap a function and add a timeout (limit) attribute to it.
Instances of this class are automatically generated by the add_timeout
function defined above. Wrapping a function allows asynchronous calls
to be made and termination of execution after a timeout has passed.
"""
def __init__(self, wrap_helper: WrapHelper) -> None:
"""Initialize instance in preparation for being called."""
self.wrap_helper = wrap_helper
self.__name__ = self.wrap_helper.wrapped.__name__
self.__doc__ = self.wrap_helper.wrapped.__doc__
self.__process = None # type: multiprocess.Process
self.__parent_conn = None # type: multiprocess.Pipe
self.__subprocess_start_time: float = 0.0
self.__subprocess_max_end_time: float = 0.0
self.__sleeping_time: float = 0.0
self.__reset_signals = wrap_helper.dec_mp_reset_signals
def __call__(self) -> Any:
"""Execute the embedded function object asynchronously.
The function given to the constructor is transparently called and
requires that "ready" be intermittently polled. If and when it is
True, the "value" property may then be checked for returned data.
"""
self.__parent_conn, self.wrap_helper.child_conn = multiprocess.Pipe(duplex=False)
self.__process = multiprocess.Process(target=_target, args=[self.wrap_helper])
# daemonic process must not have subprocess - we need that for nested decorators
self.__process.daemon = False
self.__process.start()
if not self.wrap_helper.dec_hard_timeout:
self.wait_until_process_started()
if self.wrap_helper.dec_poll_subprocess:
# here we poll for the result and check if the subprocess is still alive
# long living subprocesses might get killed by oop killer and some users want to
# return immediately if that happens
self.__subprocess_start_time = time.time()
self.__subprocess_max_end_time = self.__subprocess_start_time + self.wrap_helper.dec_timeout_float
self.adjust_sleeping_time()
while True:
if self.is_result_ready_on_the_pipe_with_timeout(timeout_seconds=self.__sleeping_time):
return self.value
if not self.__process.is_alive():
self.cancel_subprocess_was_killed()
self.adjust_sleeping_time()
if self.time_is_over():
self.cancel()
else:
# here we just wait for the result and dont poll if the subprocess is actually alive
if self.is_result_ready_on_the_pipe_with_timeout(timeout_seconds=self.wrap_helper.dec_timeout_float):
return self.value
else:
self.cancel()
def is_result_ready_on_the_pipe(self) -> bool:
""" check if there is a result on the pipe, with timeout
see also : https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection
If timeout_seconds is not specified then it will return immediately.
If timeout is a number then this specifies the maximum time in seconds to block.
If timeout is None then an infinite timeout is used.
"""
return bool(self.__parent_conn.poll())
def is_result_ready_on_the_pipe_with_timeout(self, timeout_seconds: Optional[float]) -> bool:
""" check if there is a result on the pipe, with timeout
see also : https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection
If timeout_seconds is not specified then it will return immediately.
If timeout is a number then this specifies the maximum time in seconds to block.
If timeout is None then an infinite timeout is used.
"""
return bool(self.__parent_conn.poll(timeout_seconds))
def time_is_over(self) -> bool:
""" returns True if the Time is over """
return time.time() >= self.__subprocess_max_end_time
def adjust_sleeping_time(self) -> None:
""" adjust sleeping time, not to sleep longer as the timeout allows """
if time.time() + self.wrap_helper.dec_poll_subprocess > self.__subprocess_max_end_time:
self.__sleeping_time = self.__subprocess_max_end_time - time.time()
else:
self.__sleeping_time = self.wrap_helper.dec_poll_subprocess
if self.__sleeping_time < 0:
self.__sleeping_time = 0
def cancel(self) -> None:
"""Terminate any possible execution of the embedded function."""
if self.__process.is_alive(): # pragma: no cover # we can not produce that state - its just a security measure
self.__process.terminate()
self.__process.join(timeout=1.0)
self.__parent_conn.close()
raise_exception(self.wrap_helper.timeout_exception, self.wrap_helper.exception_message)
def cancel_subprocess_was_killed(self) -> None:
self.__process.join(timeout=1.0)
self.__parent_conn.close()
subprocess_run_time = time.time() - self.__subprocess_start_time
self.wrap_helper.format_subprocess_exception_message(subprocess_run_time=subprocess_run_time)
raise_exception(ProcessError, self.wrap_helper.exception_message)
def wait_until_process_started(self) -> None:
self.__parent_conn.recv()
@property
def value(self) -> Any:
exception_occured, result = self.__parent_conn.recv()
# when self.__parent_conn.recv() exits, maybe __process is still alive,
# then it might zombie the process. so join it explicitly
self.__process.join(timeout=1.0)
self.__parent_conn.close()
if exception_occured:
raise result
else:
return result
def _target(wrap_helper: WrapHelper) -> None:
"""Run a function with arguments and return output via a pipe.
This is a helper function for the Process created in Timeout. It runs
the function with positional arguments and keyword arguments and then
returns the function's output by way of a queue. If an exception is
raised, it is returned to Timeout to be raised by the value property.
"""
# noinspection PyBroadException
try:
if multiprocess.get_start_method() == 'fork' and wrap_helper.dec_mp_reset_signals:
_mp_reset_signals()
if not wrap_helper.dec_hard_timeout:
wrap_helper.child_conn.send("started")
exception_occured = False
wrap_helper.child_conn.send((exception_occured, wrap_helper.wrapped(*wrap_helper.args, **wrap_helper.kwargs)))
except Exception:
exception_occured = True
wrap_helper.child_conn.send((exception_occured, sys.exc_info()[1]))
finally:
wrap_helper.child_conn.close()
def _mp_reset_signals() -> None:
""" if the process is using wakeup_fd, the child process will inherit the file descriptor
when we are sending a signal to the child, it goes to this opened socket.
but the parent process listens also to this socket, so it receives a signal to terminate and shut down the application.
therefore we need to return the default behavior of signal handlers for child process and don't use the inherited fd from the parent;
"""
signal.set_wakeup_fd(-1)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)