fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/common/sqs/sqs_work_dispatcher.py

Summary

Maintainability
D
2 days
Test Coverage
D
69%
import logging
import inspect
import json
import os

import psutil as ps
import signal
import time
import multiprocessing as mp

from botocore.exceptions import ClientError, EndpointConnectionError, NoCredentialsError, NoRegionError

from usaspending_api.common.sqs.queue_exceptions import (
    QueueWorkDispatcherError,
    QueueWorkerProcessError,
    ExecutionTimeout,
)
from usaspending_api.common.sqs.sqs_handler import get_sqs_queue
from usaspending_api.common.sqs.sqs_job_logging import log_dispatcher_message

# Not a complete list of signals
# NOTE: 1-15 are relatively standard on unix platforms; > 15 can change from platform to platform
BSD_SIGNALS = {
    1: "SIGHUP [1] (Hangup detected on controlling terminal or death of controlling process)",
    2: "SIGINT [2] (Interrupt from keyboard)",
    3: "SIGQUIT [3] (Quit from keyboard)",
    4: "SIGILL [4] (Illegal Instruction)",
    6: "SIGABRT [6] (Abort signal from abort(3))",
    8: "SIGFPE [8] (Floating point exception)",
    9: "SIGKILL [9] (Non-catchable, non-ignorable kill)",
    11: "SIGSEGV [11] (Invalid memory reference)",
    14: "SIGALRM [14] (Timer signal from alarm(2))",
    15: "SIGTERM [15] (Software termination signal)",
    19: "SIGSTOP [19] (Suspend process execution)",  # NOTE: 17 on Mac OSX, 19 on RHEL
    20: "SIGTSTP [20] (Interrupt from keyboard to suspend (CTRL-Z)",  # NOTE: 18 on Mac OSX, 20 on RHEL
}


class SQSWorkDispatcher:
    """SQSWorkDispatcher object that is used to pull work from an SQS queue, and then dispatch it to be
    executed on a child worker process.

    This has the benefit of the worker process terminating after completion of the work, which will clear all
    resources used during its execution, and return memory to the operating system. It also allows for periodically
    monitoring the progress of the worker process from the parent process, and extending the SQS
    VisibilityTimeout, so that the message is not prematurely terminated and/or moved to the Dead Letter Queue

    Attributes:
        EXIT_SIGNALS (List[int]): Delineates each of the signals we want to handle, which represent a case where
            the work being performed is prematurely halting, and the process will exit



    """

    EXIT_SIGNALS = [signal.SIGHUP, signal.SIGABRT, signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
    # NOTE: We are not handling signal.SIGSTOP or signal.SIGTSTP because those are suspensions.
    # There may be valid cases to suspend and then later resume the process.
    # It is even done here via multiprocessing.Process.suspend() to suspend during cleanup.
    # So we do NOT want to handle that

    def __init__(
        self,
        sqs_queue_instance,
        worker_process_name=None,
        default_visibility_timeout=60,
        long_poll_seconds=None,
        monitor_sleep_time=5,
        exit_handling_timeout=30,
        worker_can_start_child_processes=False,
    ):
        """
        Args:
            sqs_queue_instance (SQS.Queue): the SQS queue to get work from
            worker_process_name (str): the name to give to the worker process. It will use the name of the callable
                job to be executed if not provided
            default_visibility_timeout (int): how long until the message is made visible in the queue again,
               for other consumers. If it only allows 1 retry, this may end up directing it to the Dead Letter queue
               when the next consumer attempts to receive it.
            long_poll_seconds (int): if it should wait patiently for some time to receive a message when requesting.
               Defaults to None, which means it should honor the value configured on the SQS queue
            monitor_sleep_time (float): periodicity to check up on the status of the worker process
            exit_handling_timeout (int): expected window of time during which cleanup should complete (not
                guaranteed). This for example would be the time to finish cleanup before the messages is re-queued
                or DLQ'd
            worker_can_start_child_processes (bool): Controls the multiprocessing.Process daemon attribute. When
                ``daemon=True``, it would ensure that if the parent dispatcher dies, it will kill its child
                worker. However, from the docs "a daemonic process is not allowed to create child processes",
                because termination is not cascaded, so a terminated parent killing a direct child would leave
                orphaned grand-children. So if the worker process needs to start its own child processes in order to
                do its work, set this to ``True``, so that ``Process.daemon`` will be set to ``False``,
                allowing the creation of grandchild processes from the dispatcher's child worker process.
        """
        self._logger = logging.getLogger(__name__ + "." + self.__class__.__name__)
        self.sqs_queue_instance = sqs_queue_instance
        self.worker_process_name = worker_process_name
        self._default_visibility_timeout = default_visibility_timeout
        self._monitor_sleep_time = monitor_sleep_time
        self._exit_handling_timeout = exit_handling_timeout
        self._worker_can_start_child_processes = worker_can_start_child_processes
        self._current_sqs_message = None
        self._worker_process = None
        self._job_args = ()
        self._job_kwargs = {}
        self._exit_handler = None
        self._parent_dispatcher_pid = os.getppid()
        self._sqs_heartbeat_log_period_seconds = 15  # log the heartbeat extension of visibility at most this often
        self._long_poll_seconds = 0  # if nothing is set anywhere, it defaults to 0 (short-polling)

        if long_poll_seconds:
            self._long_poll_seconds = long_poll_seconds
        else:
            receive_message_wait_time_seconds = self.sqs_queue_instance.attributes.get("ReceiveMessageWaitTimeSeconds")
            if receive_message_wait_time_seconds:
                self._long_poll_seconds = int(receive_message_wait_time_seconds)

        # Flag used to prevent handling an exit signal more than once, if multiple signals come in succession
        # True when the parent dispatcher process is handling one of the signals in EXIT_SIGNALS, otherwise False
        self._handling_exit_signal = False
        # Flagged True when exit handling should lead to an exit of the parent dispatcher process
        self._dispatcher_exiting = False

        if self._monitor_sleep_time >= self._default_visibility_timeout:
            msg = (
                "_monitor_sleep_time must be less than _default_visibility_timeout. "
                "Otherwise job duplication can occur"
            )
            raise QueueWorkDispatcherError(
                msg, worker_process_name=self.worker_process_name, queue_message=self._current_sqs_message
            )

        # Map handler functions for each of the exit signals we want to handle on the parent dispatcher process
        for sig in self.EXIT_SIGNALS:
            signal.signal(sig, self._handle_exit_signal)

    @property
    def is_exiting(self):
        """bool: True when this parent dispatcher process has received a signal that will lead to the process
        exiting
        """
        return self._dispatcher_exiting

    @property
    def allow_retries(self):
        """bool: Determine from the provided queue if retries of messages should be performed

        If False, the message will not be returned to the queue for retries by other consumers
        """
        redrive_policy = self.sqs_queue_instance.attributes.get("RedrivePolicy")
        if not redrive_policy:
            return True  # Messages in queues without a redrive policy basically have endless retries
        redrive_json = json.loads(redrive_policy)
        retries = redrive_json.get("maxReceiveCount")
        return retries and retries > 1

    @property
    def message_try_attempt(self):
        """int: Give an approximation of which attempt this is to process this message

        Will return None if the message has not been received yet or if the receive count is not being tracked
        This can be used to determine if this is a retry of this message (if return value > 1)
        """
        if not self._current_sqs_message:
            return None
        # get the non-user-defined (queue-defined) attributes on the message
        q_msg_attr = self._current_sqs_message.attributes
        if q_msg_attr.get("ApproximateReceiveCount") is None:
            return None
        else:
            return int(q_msg_attr.get("ApproximateReceiveCount"))

    def _dispatch(self, job, worker_process_name=None, exit_handler=None, *job_args, **job_kwargs):
        """Dispatch work to be performed in a newly started worker process.

        Execute the callable job that will run in a separate child worker process. The worker process
        will be monitored so that heartbeats can be sent to SQS to allow it to keep going.

        Args:
            job (Callable): Callable to use as the target of a new child process
            worker_process_name (str): Name given to the newly created child process. If not already set,
                defaults to the name of the provided job callable
            exit_handler: a callable to be called when handling an :attr:`EXIT_SIGNALS` signal, giving the
                opportunity to perform cleanup before the process exits. When invoked, it gets the same args passed
                to it that were passed to the :param:`job` callable.
            job_args: Zero or many variadic args that are unnamed (not keyword) that can be
                passed to the callable job. NOTE: Passing args to the job this
                way can be done when needed, but it is preferred to use keyword-args through :param:`job_kwargs`
                to be more explicit what arg values are going to what params.
            job_kwargs: Zero or many variadic keyword-args that can be passed to the callable job

        Returns:
            bool: True if a message was found on the queue and dispatched to completion, without error. Otherwise it
                is an error condition and will raise an exception.

        Raises:
            AttributeError: If this is called before setting self._current_sqs_message
            Others: Exceptions raised by :meth:`_monitor_work_progress`
        """
        if self._current_sqs_message is None:
            raise AttributeError("Cannot dispatch work when there is no current message in self._current_sqs_message")
        self.worker_process_name = worker_process_name or self.worker_process_name or job.__name__
        log_dispatcher_message(
            self,
            message="Creating and starting worker process named [{}] to invoke callable [{}] "
            "with args={} and kwargs={}".format(self.worker_process_name, job, job_args, job_kwargs),
        )

        # Set the exit_handler function if provided, or reset to None
        # Also save the job_args, and job_kwargs, which will be passed to the exit_handler
        self._exit_handler = exit_handler
        self._job_args = job_args
        self._job_kwargs = job_kwargs

        # Use the 'fork' method to create a new child process.
        # This shares the same python interpreter and memory space and references as the parent process
        # A side-effect of that is that it inherits the signal-handlers of the parent process. So wrap the job to be
        # executed with some before-advice that resets the signal-handlers to python-defaults within the child process
        ctx = mp.get_context("fork")

        def signal_reset_wrapper(func, args, kwargs):
            # Reset signal handling to defaults in process that calls this
            for sig in self.EXIT_SIGNALS:
                signal.signal(sig, signal.SIG_DFL)
            # Then execute the given function with the given args
            func(*args, **kwargs)

        self._worker_process = ctx.Process(
            name=self.worker_process_name,
            target=signal_reset_wrapper,
            args=(job, job_args, job_kwargs),
            daemon=not self._worker_can_start_child_processes,
        )
        self._worker_process.start()
        log_dispatcher_message(
            self,
            message="Worker process named [{}] started with process ID [{}]".format(
                self.worker_process_name, self._worker_process.pid
            ),
            is_debug=True,
        )
        self._monitor_work_progress()
        return True

    def dispatch(
        self,
        job,
        *additional_job_args,
        message_transformer=lambda msg: msg.body,
        worker_process_name=None,
        exit_handler=None,
        **additional_job_kwargs,
    ):
        """Get work from the queue and dispatch it in a newly started worker process.

        Poll the queue for a message to work on. Any additional (not named in the signature) args or additional
        (not named in the signature) keyword-args will be combined with the args/kwargs provided by the
        message_transformer. This complete set of arguments is provided to the callable :param:`job` that will be
        executed in a separate worker process. The worker process will be monitored so that heartbeats can be
        sent to the SQS to allow it to keep going.

        Args:
            job (Callable[]): the callable to execute as work for the job
            additional_job_args: Zero or many variadic args that are unnamed (not keyword) that may be
                passed along with those from the message to the callable job. NOTE: Passing args to the job this
                way can be done when needed, but it is preferred to use keyword-args through
                :param:`additional_job_kwargs` to be more explicit what arg values are going to what params.
            message_transformer: A lambda to extract arguments to be passed to the callable from the queue message.
                By default, a function that just returns the message body is used. A provided transformer can
                return a single object as its arg, a tuple of args, or a dictionary of named args (which will
                be used as keyword-args in the job invocation). This is a keyword-only argument that can only be
                passed with its name: ``message_transformer=xxx``
            worker_process_name: Name given to the newly created child process. If not already set, defaults to
                the name of the provided job callable. This is a keyword-only argument that can only be
                passed with its name: ``worker_process_name=abc``
            exit_handler: a callable to be called when handling an :attr:`EXIT_SIGNALS` signal, giving the
                opportunity to perform cleanup before the process exits. When invoked, it gets the same args passed
                to it that were passed to the :param:`job` callable.
            additional_job_kwargs: Zero or many variadic keyword-args that may be passed along with those from
                the message to the callable job

        Examples:
            Given job function::

                def my_job(a, b, c, d):
                    pass  # do something

            The preferred way to dispatch it is with a message_transformer that returns a dictionary and,
            if necessary, additional args passed as keyword args::

                dispatcher.dispatch(my_job,
                                    message_transformer=lambda msg: {"a": msg.body, "b": db.get_org(msg.body)},
                                    c=some_tracking_id,
                                    d=datetime.datetime.now())

        Returns:
            bool: True if a message was found on the queue and dispatched, otherwise False if nothing on the queue

        Raises:
            SystemExit(1): If it can't connect to the queue or receive messages
            QueueWorkerProcessError: Under various conditions where the child worker process fails to run the job
            QueueWorkDispatcherError: Under various conditions where the parent dispatcher process can't
                orchestrate work execution, monitoring, or exit-signal-handling
        """
        self._dequeue_message(self._long_poll_seconds)
        if self._current_sqs_message is None:
            return False

        job_args = ()
        job_kwargs = additional_job_kwargs  # important that this be set as default, in case msg_args is not also a dict
        msg_args = message_transformer(self._current_sqs_message)
        if isinstance(msg_args, list) or isinstance(msg_args, tuple):
            job_args = tuple(msg_args) + additional_job_args
        elif isinstance(msg_args, dict):
            if msg_args.keys() & additional_job_kwargs.keys():
                raise KeyError(
                    "message_transformer produces named keyword args that are duplicative of those in "
                    "additional_job_kwargs and would be overwritten"
                )
            job_kwargs = {**msg_args, **additional_job_kwargs}
        else:
            job_args = (msg_args,) + additional_job_args

        return self._dispatch(job, worker_process_name, exit_handler, *job_args, **job_kwargs)

    def dispatch_by_message_attribute(
        self, message_transformer, *additional_job_args, worker_process_name=None, **additional_job_kwargs
    ):
        """Use a provided function to derive the callable job and its arguments from attributes within the queue
        message

        Args:
            message_transformer (Callable[[SQS.Message], dict]): A callable function that takes in the SQS
            message as its argument and returns a dict of::

                {
                    '_job': Callable,          # Required. The job to run
                    '_exit_handler': Callable  # Optional. A callable to be called when handling an
                                                    :attr:`EXIT_SIGNALS` signal, giving the opportunity to
                                                    perform cleanup before the process exits. Gets the ``job_args``
                                                    and any other items in this dict as args passed to it when run.
                    '_job_args': tuple,        # Optional. Partial or full collection of job args as a list or
                                                    tuple.
                    'named_job_arg1': Any,     # Optional. A named argument to be used as a keyword arg when
                                                    calling the job. ``named_job_arg1`` is representative, and the
                                                    actual names of the ``_job``'s params should be used here.
                                                    This is the preferred way to pass args to the ``_job``
                    'named_job_argN: Any       # Optional: Same as above. As many as are needed.
                }

            worker_process_name (str): Name given to the newly created child process. If not already set, defaults
                to the name of the provided job Callable
            additional_job_args: Zero or many variadic args that are unnamed (not keyword) that may be
                passed along with those from the message to the callable job. NOTE: Passing args to the job this
                way can be done when needed, but it is preferred to use keyword-args through
                :param:`additional_job_kwargs` to be more explicit what arg values are going to what params.
            additional_job_kwargs: Zero or many variadic keyword-args that may be passed along with those from
                the message to the callable job

        Examples:
            Given job function::

                def my_job(a, b, c, d):
                    pass  # do something

            The preferred way to dispatch it is with a ``message_transformer`` that returns a dictionary and,
            if necessary, additional args passed as keyword args::

                def work_routing_message_transformer(msg):
                    job = job_strategy_factory.from(msg.message_attributes["origin"]["StringValue"])
                    return {"_job": job,
                            "a": msg.body,
                            "b": db.get_org(msg.body)}

                dispatcher.dispatch_by_message_attribute(work_routing_message_transformer,
                                                         c=some_tracking_id,
                                                         d=datetime.datetime.now())

        Returns:
            bool: True if a message was found on the queue and dispatched, otherwise False if nothing on the queue

        Raises:
            SystemExit(1): If it can't connect to the queue or receive messages
            QueueWorkerProcessError: Under various conditions where the child worker process fails to run the job
            QueueWorkDispatcherError: Under various conditions where the parent dispatcher process can't
                orchestrate work execution, monitoring, or exit-signal-handling
        """
        self._dequeue_message(self._long_poll_seconds)
        if self._current_sqs_message is None:
            return False

        job_args = ()
        job_kwargs = {}
        results = message_transformer(self._current_sqs_message)

        def parse_message_transformer_results(_job, _exit_handler=None, _job_args=(), **_job_kwargs):
            """Use to map dictionary items to this functions params, and packs up remaining items in a separate
            dictionary. Then returns the organized data as a 4-tuple
            """
            return _job, _exit_handler, _job_args, _job_kwargs

        # Parse the result components from the returned dictionary
        job, exit_handler, msg_args, msg_kwargs = parse_message_transformer_results(**results)

        if isinstance(msg_args, list) or isinstance(msg_args, tuple):
            job_args = tuple(msg_args) + additional_job_args
        else:
            job_args = (msg_args,) + additional_job_args  # single element

        if msg_kwargs.keys() & additional_job_kwargs.keys():
            raise KeyError(
                "message_transformer produces named keyword args that are duplicative of those in "
                "additional_job_kwargs and would be overwritten"
            )
        job_kwargs = {**msg_kwargs, **additional_job_kwargs}

        return self._dispatch(job, worker_process_name, exit_handler, *job_args, **job_kwargs)

    def _dequeue_message(self, wait_time):
        """Attempt to get a single message from the queue.

        It will set this message in the `self._current_sqs_message` field if received, otherwise it leaves that None

        Args:
            wait_time: If no message is readily available, wait for this many seconds for one to arrive before
                returning
        """
        try:
            # NOTE: Forcing MaxNumberOfMessages=1
            # This will pull at most 1 message off the queue, or no messages. This dispatcher is built to dispatch
            # one queue message at a time for work, such that one job at a time is handled by the child worker
            # process. The best way to then scale up work-throughput is to have multiple consumers (e.g. multiple
            # parent dispatcher processes on one machine, or multiple machines each running a parent dispatcher process)
            # This may not be an ideal configuration for jobs that may expect to complete with sub-second performance,
            # and handle a massive amount of message-throughput (e.g. 10+ messages/second, or 1M+ messages/day),
            # where the added latency of connecting to the queue to fetch each message could add up.
            received_messages = self.sqs_queue_instance.receive_messages(
                WaitTimeSeconds=wait_time,
                AttributeNames=["All"],
                MessageAttributeNames=["All"],
                VisibilityTimeout=self._default_visibility_timeout,
                MaxNumberOfMessages=1,
            )
        except (EndpointConnectionError, ClientError, NoCredentialsError, NoRegionError) as conn_exc:
            log_dispatcher_message(
                self, message="SQS connection issue. See Traceback and investigate settings", is_exception=True
            )
            raise SystemExit(1) from conn_exc
        except Exception as exc:
            log_dispatcher_message(
                self,
                message="Unknown error occurred when attempting to receive messages from the queue. See Traceback",
                is_exception=True,
            )
            raise SystemExit(1) from exc

        if received_messages:
            self._current_sqs_message = received_messages[0]
            log_dispatcher_message(self, message="Message received: {}".format(self._current_sqs_message.body))

    def delete_message_from_queue(self):
        """Deletes the message from SQS. This is usually treated as a *successful* culmination of message handling,
        so long as it was not previously copied to the Dead Letter Queue before deletion

        Raises:
            QueueWorkDispatcherError: If for some reason the boto3 delete() operation caused an error
        """
        if self._current_sqs_message is None:
            log_dispatcher_message(
                self,
                message="Message to delete does not exist. "
                "Message might have previously been moved, released, or deleted",
                is_warning=True,
            )
            return
        try:
            self._current_sqs_message.delete()
            self._current_sqs_message = None
        except Exception as exc:
            message = (
                "Unable to delete SQS message from queue. "
                "Message might have previously been deleted upon completion or failure"
            )
            log_dispatcher_message(self, message=message, is_exception=True)
            raise QueueWorkDispatcherError(
                message, worker_process_name=self.worker_process_name, queue_message=self._current_sqs_message
            ) from exc

    def surrender_message_to_other_consumers(self, delay=0):
        """Return the message back into its original queue for other consumers to process it.

        Does this by making it immediately visible (it was always there, just invisible).

        See Also:
            https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html#terminating-message-visibility-timeout  # noqa

        Args:
            delay (int): How long to wait in seconds before message becomes visible to other consumers. Default
            is 0 seconds (immediately consumable)
        """
        # Protect against the scenario where one of many consumers is in the midst of a long-poll when an exit signal
        # is received. If they are in this position, and the message is returned to the queue, they will dequeue it
        # (receive it) when they should not have, because they should have been responding to the signal.
        # They will respond to the signal, but not until after they return from receiving the message, and the message
        # will be lost.
        # So, so only make the message visible after any long-poll has returned
        if self._long_poll_seconds and self._long_poll_seconds > 0:
            delay = self._long_poll_seconds + 1

        if self._current_sqs_message:
            retried = 0
            if self._current_sqs_message.attributes.get("ApproximateReceiveCount") is not None:
                retried = int(self._current_sqs_message.attributes.get("ApproximateReceiveCount"))
            log_dispatcher_message(
                self,
                message="Returning message to its queue for retry number {} in {} seconds. "
                "If this exceeds the configured number of retries in the queue, it "
                "will be moved to its Dead Letter Queue".format(retried + 1, delay),
            )

        self._set_message_visibility(delay)

    def move_message_to_dead_letter_queue(self):
        """Move the current message to the Dead Letter Queue.

        Moves the message to the Dead Letter Queue associated with this worker's SQS queue via its RedrivePolicy.
        Then delete the message from its originating queue.

        Raises:
            QueueWorkDispatcherError: Raise an error if there is no RedrivePolicy or a Dead Letter Queue is not
                configured, specified, or connectible for this queue.
        """
        if self._current_sqs_message is None:
            log_dispatcher_message(
                self,
                message="Unable to move SQS message to the dead letter queue. No current message exists. "
                "Message might have previously been moved, released, or deleted",
                is_warning=True,
            )
            return

        dlq_name = "not discovered yet"
        try:
            redrive_policy = self.sqs_queue_instance.attributes.get("RedrivePolicy")
            if not redrive_policy:
                error = (
                    'Failed to move message to dead letter queue. Cannot get RedrivePolicy for SQS queue "{}". '
                    "It was not set, or was not included as an attribute to be "
                    "retrieved with this queue.".format(self.sqs_queue_instance)
                )
                log_dispatcher_message(self, message=error, is_error=True)
                raise QueueWorkDispatcherError(
                    error, worker_process_name=self.worker_process_name, queue_message=self._current_sqs_message
                )

            redrive_json = json.loads(redrive_policy)
            dlq_arn = redrive_json.get("deadLetterTargetArn")
            if not dlq_arn:
                error = (
                    "Failed to move message to dead letter queue. "
                    "Cannot find a dead letter queue in the RedrivePolicy "
                    'for SQS queue "{}"'.format(self.sqs_queue_instance)
                )
                log_dispatcher_message(self, message=error, is_error=True)
                raise QueueWorkDispatcherError(
                    error, worker_process_name=self.worker_process_name, queue_message=self._current_sqs_message
                )
            dlq_name = dlq_arn.split(":")[-1]

            # Copy the message to the designated dead letter queue
            dlq = get_sqs_queue(queue_name=dlq_name)
            message_attr = (
                self._current_sqs_message.message_attributes.copy()
                if self._current_sqs_message.message_attributes
                else {}
            )
            dlq_response = dlq.send_message(MessageBody=self._current_sqs_message.body, MessageAttributes=message_attr)
        except Exception as exc:
            error = (
                "Error occurred when parent dispatcher with PID [{}] tried to move the message "
                "for worker process with PID [{}] "
                "to the dead letter queue [{}].".format(os.getpid(), self._worker_process.pid, dlq_name)
            )
            log_dispatcher_message(self, message=error, is_exception=True)
            raise QueueWorkDispatcherError(
                error, worker_process_name=self.worker_process_name, queue_message=self._current_sqs_message
            ) from exc

        log_dispatcher_message(
            self,
            message='Message sent to dead letter queue "{}" '
            "with [{}] response code. "
            "Now deleting message from origin queue".format(
                dlq_name, dlq_response["ResponseMetadata"]["HTTPStatusCode"]
            ),
            is_debug=True,
        )
        self.delete_message_from_queue()

    def _monitor_work_progress(self):
        """Tracks the running or exit status of the child worker process.

        There are four scenarios it detects when monitoring:
            1. It extends life of the process back at the queue if it detects the child worker process is still
               running
            2. It sees the child worker process exited with a 0 exit code (success). It deletes the message from
               the queue and logs success.
            3. It sees the child worker process exited with a > 0 exit code. Handles this and raises
               :exc:`QueueWorkerProcessError` to alert the caller.
            4. It sees the child worker process exited with a < 0 exit code, indicating it received some kind of
               signal. It calls the signal-handling code to handle this accordingly.

        Raises:
            QueueWorkerProcessError: When the child worker process was found to have exited with a > 0 exit
                code.
            Others: Anything raised by :meth:`_set_message_visibility`, :meth:`delete_message_from_queue`,
                or :meth:`_handle_exit_signal`
        """
        monitor_process = True
        heartbeats = 0
        while monitor_process:
            monitor_process = False
            if self._worker_process.is_alive():
                # Process still working. Send "heartbeat" to SQS so it may continue
                if (heartbeats * self._monitor_sleep_time) >= self._sqs_heartbeat_log_period_seconds:
                    log_dispatcher_message(
                        self,
                        message="Job worker process with PID [{}] is still running. "
                        "Renewing VisibilityTimeout of {} seconds".format(
                            self._worker_process.pid, self._default_visibility_timeout
                        ),
                        is_debug=True,
                    )
                    heartbeats = 0
                else:
                    heartbeats += 1
                self._set_message_visibility(self._default_visibility_timeout)
                time.sleep(self._monitor_sleep_time)
                monitor_process = True
            elif self._worker_process.exitcode == 0:
                # If process exits with 0: success! Remove from queue
                log_dispatcher_message(
                    self,
                    message="Job worker process with PID [{}] completed with 0 for exit code (success). Deleting "
                    "message from the queue".format(self._worker_process.pid),
                )
                self.delete_message_from_queue()
            elif self._worker_process.exitcode > 0:
                # If process exits with positive code, an ERROR occurred within the worker process.
                # Don't delete the message, don't force retry, and don't force into dead letter queue.
                # Let the VisibilityTimeout expire, and the queue will handle whether to retry or put in
                # the DLQ based on its queue configuration.
                # Raise an exception to give control back to the caller to allow any error-handling to take place there
                message = "Job worker process with PID [{}] errored with exit code: {}.".format(
                    self._worker_process.pid, self._worker_process.exitcode
                )
                log_dispatcher_message(self, message=message, is_error=True)
                raise QueueWorkerProcessError(
                    message, worker_process_name=self.worker_process_name, queue_message=self._current_sqs_message
                )
            elif self._worker_process.exitcode < 0:
                # If process exits with a negative code, process was terminated by a signal since
                # a Python subprocess returns the negative value of the signal.
                signum = self._worker_process.exitcode * -1
                # In the rare case where the child worker process's exit signal is detected before its parent
                # process received the same signal, proceed with handling the child's exit signal
                self._handle_exit_signal(signum=signum, frame=None, parent_dispatcher_signaled=False)

    def _handle_exit_signal(self, signum, frame, parent_dispatcher_signaled=True, is_retry=False):
        """Attempt to gracefully handle the exiting of the job as a result of receiving an exit signal.

        NOTE: This handler is only expected to be run from the parent dispatcher process. It is an error
        condition if it is invoked from the child worker process. Signals received in the child worker
        process should not have this handler registered for those signals. Because signal handlers registered for a
        parent process are inherited by forked child processes (see: "man 7 signal" docs and search for
        "inherited"), this handler will initially be registered for each of this class's EXIT_SIGNALS; however,
        those handlers are reset to their default as the worker process is started by a wrapper function around
        the job to execute.

        The signal very likely indicates a non-error failure scenario from which the job might be restarted or
        rerun, if allowed. Handle cleanup, logging, job retry logic, etc.
        NOTE: Order is important:
            1. "Lock" (guard) this exit handler, to handle one exit signal only
            2. Extend VisibilityTimeout by _exit_handling_timeout, to allow time for cleanup
            3. Suspend child worker process, if alive, else skip (to not interfere with cleanup)
            4. Execute pre-exit cleanup
            5. Move the message (back to the queue, or to the dead letter queue)
            6. Kill child worker process
            7. Exit this parent dispatcher process if it received an exit signal

        Args:
            signum: number representing the signal received
            frame: Frame passed in with the signal
            parent_dispatcher_signaled: Assumed that this handler is being called as a result of signaling that
                happened on the parent dispatcher process. If this was instead the parent detecting that the child
                exited due to a signal, set this to False. Defaults to True.
            is_retry: If this is the 2nd and last try to handled the signal and perform pre-exit cleanup

        Raises:
            QueueWorkerProcessError: When the _exit_handling function cannot be completed in time to perform
                pre-exit cleanup
            QueueWorkerProcessError: If only the child process died (not the parent dispatcher), but the message it
                was working was moved to the Dead Letter Queue because the queue is not configured to allow retries,
                raise an exception so the parent process might mark the job as failed with its normal error-handling
                process
            SystemExit: If the parent dispatcher process received an exit signal, this should be raised
                after handling the signal, as part of running os.kill of that process using the original signal
                number that was originally handled
        """
        # If the process handling this signal is the same as the started worker process, this *is* the worker process
        is_worker_process = self._worker_process and self._worker_process.pid == os.getpid()

        # If the signal is in BSD_SIGNALS, use the human-readable string, otherwise use the signal value
        signal_or_human = BSD_SIGNALS.get(signum, signum)

        if is_worker_process:
            log_dispatcher_message(
                self,
                message="Worker process with PID [{}] received signal [{}] while running. "
                "It should not be handled in this exit signal handler, "
                "as it is meant to run from the parent dispatcher process."
                "Exiting worker process with original exit signal.".format(os.getpid(), signal_or_human),
                is_warning=True,
            )
            self._kill_worker(with_signal=signum)

        elif not parent_dispatcher_signaled:
            log_dispatcher_message(
                self,
                message="Job worker process with PID [{}] exited due to signal with exit code: [{}], "
                "after receiving exit signal [{}]. "
                "Gracefully handling exit of job".format(
                    self._worker_process.pid, self._worker_process.exitcode, signal_or_human
                ),
                is_error=True,
            )
            # Attempt to cleanup children, but likely will be a no-op because the worker has already exited and
            # orphaned them
            self._kill_worker(with_signal=signum, just_kill_descendants=True)

        else:  # dispatcher signaled
            log_dispatcher_message(
                self,
                message="Parent dispatcher process with PID [{}] received signal [{}] while running. "
                "Gracefully stopping job being worked".format(os.getpid(), signal_or_human),
                is_error=True,
            )

            # Make sure the parent dispatcher process exits after handling the signal by setting this flag
            self._dispatcher_exiting = True

        if self._handling_exit_signal and not is_retry:
            # Don't allow the parent dispatcher process to handle more than one exit signal, unless this is a retry
            # of the first attempt
            return

        self._handling_exit_signal = True

        exit_handling_failed = False  # flag to mark this kind of end-state of this handler
        try:
            # Extend message visibility for as long is given for the exit handling to process, so message does not get
            # prematurely returned to the queue or moved to the dead letter queue.
            # Give it a 5 second buffer in case retry mechanics put it over the timeout
            self._set_message_visibility(self._exit_handling_timeout + 5)

            if self._worker_process.is_alive():
                worker = ps.Process(self._worker_process.pid)
                if not is_retry:
                    # Suspend the child worker process so it does not conflict with doing cleanup in the exit_handler
                    log_dispatcher_message(
                        self,
                        message="Suspending worker process with PID [{}] during first try of exit_handler".format(
                            self._worker_process.pid
                        ),
                        is_debug=True,
                    )
                    worker.suspend()
                else:
                    # This is try 2. The cleanup was not able to complete on try 1 with the worker process merely
                    # suspended. There could be some kind of transaction deadlock. Kill the worker to clear the way
                    # for cleanup retry
                    log_dispatcher_message(
                        self,
                        message="Killing worker process with PID [{}] during second try of exit_handler".format(
                            self._worker_process.pid
                        ),
                        is_debug=True,
                    )
                    self._kill_worker()
            else:
                try_attempt = "second" if is_retry else "first"
                log_dispatcher_message(
                    self,
                    message="Worker process with PID [{}] is not alive during {} try of exit_handler".format(
                        self._worker_process.pid, try_attempt
                    ),
                    is_debug=True,
                )

            if self._exit_handler is not None:
                # Execute cleanup procedures to handle exiting of the worker process
                # Wrap cleanup in a fixed timeout, and a retry (one time)
                try:
                    with ExecutionTimeout(self._exit_handling_timeout):
                        # Call _exit_handler callable to do cleanup
                        queue_message_param = inspect.signature(self._exit_handler).parameters.get("queue_message")
                        arg_spec = inspect.getfullargspec(self._exit_handler)
                        if queue_message_param or arg_spec.varkw == "kwargs":
                            # it defines a param named "queue_message", or accepts kwargs,
                            # so pass along the message as "queue_message" in  case it's needed
                            log_dispatcher_message(
                                self,
                                message="Invoking job exit_handler [{}] with args={}, kwargs={}, "
                                "and queue_message={}".format(
                                    self._exit_handler, self._job_args, self._job_kwargs, self._current_sqs_message
                                ),
                            )
                            self._exit_handler(
                                *self._job_args, **self._job_kwargs, queue_message=self._current_sqs_message
                            )
                        else:
                            log_dispatcher_message(
                                self,
                                message="Invoking job exit_handler [{}] with args={} "
                                "and kwargs={}".format(self._exit_handler, self._job_args, self._job_kwargs),
                            )
                            self._exit_handler(*self._job_args, **self._job_kwargs)
                except TimeoutError:
                    exit_handling_failed = True
                    if not is_retry:
                        log_dispatcher_message(
                            self,
                            message="Could not perform cleanup during exiting of job in allotted "
                            "_exit_handling_timeout ({}s). "
                            "Retrying once.".format(self._exit_handling_timeout),
                            is_warning=True,
                        )
                        # Attempt retry
                        self._handle_exit_signal(signum, frame, parent_dispatcher_signaled, is_retry=True)
                    else:
                        message = (
                            "Could not perform cleanup during exiting of job in allotted "
                            "_exit_handling_timeout ({}s) after 2 tries. "
                            "Raising exception.".format(self._exit_handling_timeout)
                        )
                        log_dispatcher_message(self, message=message, is_error=True)
                        raise QueueWorkDispatcherError(
                            message,
                            worker_process_name=self.worker_process_name,
                            queue_message=self._current_sqs_message,
                        )
                except Exception as exc:
                    exit_handling_failed = True
                    message = "Execution of exit_handler failed for unknown reason. See Traceback."
                    log_dispatcher_message(self, message=message, is_exception=True)
                    raise QueueWorkDispatcherError(
                        message, worker_process_name=self.worker_process_name, queue_message=self._current_sqs_message
                    ) from exc

            if self.allow_retries:
                log_dispatcher_message(
                    self,
                    message="Dispatcher for this job allows retries of the message. "
                    "Attempting to return message to its queue.",
                )
                self.surrender_message_to_other_consumers()
            else:
                # Otherwise, attempt to send message directly to the dead letter queue
                log_dispatcher_message(
                    self,
                    message="Dispatcher for this job does not allow message to be retried. "
                    "Attempting to move message to its Dead Letter Queue.",
                )
                self.move_message_to_dead_letter_queue()

            # Now that SQS message has been handled, ensure worker process is dead
            if self._worker_process.is_alive():
                log_dispatcher_message(
                    self,
                    message="Message handled. Now killing worker process with PID [{}] "
                    "as it is alive but has no more work to do.".format(self._worker_process.pid),
                    is_debug=True,
                )
                self._kill_worker()
        finally:
            if exit_handling_failed:
                # Don't perform final logic if we got here after failing to execute the exit handler
                if is_retry:
                    # 2nd time we've failed to process exit_handle? - log so and move to DLQ
                    log_dispatcher_message(
                        self,
                        message="Graceful exit_handling failed after 2 attempts for job being worked by worker process "
                        "with PID [{}]. An attempt to put message directly on the Dead Letter Queue will be "
                        "made, because cleanup logic in the exit_handler could not be guaranteed "
                        "(retrying a message without prior cleanup "
                        "may compound problems).".format(self._worker_process.pid),
                        is_error=True,
                    )
                    self.move_message_to_dead_letter_queue()
            else:
                # exit_handler not provided, or processed successfully
                self._handling_exit_signal = False
                if self._dispatcher_exiting:
                    # An exit signal was received by the parent dispatcher process.
                    # Continue with exiting the parent process, as per the original signal, after having handled it
                    log_dispatcher_message(
                        self,
                        message="Exiting from parent dispatcher process with PID [{}] "
                        "to culminate exit-handling of signal [{}]".format(os.getpid(), signal_or_human),
                        is_debug=True,
                    )
                    # Simply doing sys.exit or raise SystemExit, even with a given negative exit code won't exit this
                    # process with the negative signal value, as is the Python custom.
                    # So override the assigned handler for the signal we're handing, resetting it back to default,
                    # and kill the process using that signal to get it to exit as it would if we never handled it
                    signal.signal(signum, signal.SIG_DFL)
                    os.kill(os.getpid(), signum)
                elif not self.allow_retries:
                    # If only the child process died, but the message it was working was moved to the Dead Letter Queue
                    # because the queue is not configured to allow retries, raise an exception so the caller
                    # might mark the job as failed with its normal error-handling process
                    raise QueueWorkerProcessError(
                        "Worker process with PID [{}] was not able to complete its job due to "
                        "interruption from exit signal [{}]. The queue message for that job "
                        "was moved to the Dead Letter Queue, where it can be reviewed for a "
                        "manual restart, or other remediation.".format(self._worker_process.pid, signal_or_human),
                        worker_process_name=self.worker_process_name,
                        queue_message=self._current_sqs_message,
                    )

    def _set_message_visibility(self, new_visibility):
        """Sends a request back to the SQS API via boto3 to refresh the time that the currently-in-flight message
        will be *invisible* to other consumers of the queue, such that they cannot also process it.

        Examples:
            If :param:`new_visibility` is 60, then in 60 seconds from when SQS receives the request, so long as
            the ``VisibilityTimeout`` is not adjusted again, the in-flight message will reappear on the queue. It
            will remain invisible until that time.

        Raises:
            QueueWorkDispatcherError: If for some reason the boto3 change_visibility() operation caused an error
        """
        if self._current_sqs_message is None:
            log_dispatcher_message(
                self,
                message="No SQS message to change visibility of. Message might have previously been released, "
                "deleted, or moved to dead letter queue.",
                is_warning=True,
            )
            return
        try:
            self._current_sqs_message.change_visibility(VisibilityTimeout=new_visibility)
        except Exception as exc:
            message = (
                "Unable to set VisibilityTimeout. "
                "Message might have previously been deleted upon completion or failure"
            )
            log_dispatcher_message(self, message=message, is_exception=True)
            raise QueueWorkDispatcherError(
                message, worker_process_name=self.worker_process_name, queue_message=self._current_sqs_message
            ) from exc

    def _kill_worker(self, with_signal=None, just_kill_descendants=False):
        """
        Cleanup (kill) a worker process and its spawned descendant processes
        Args:
            with_signal: Use this termination signal when killing. Otherwise, hard kill (-9)
            just_kill_descendants: Set to True to leave the worker and only kill its descendants
        """
        if not self._worker_process or not ps.pid_exists(self._worker_process.pid):
            return
        try:
            worker = ps.Process(self._worker_process.pid)
        except ps.NoSuchProcess:
            return

        if self._worker_can_start_child_processes:
            # First kill any other processes the worker may have spawned
            for spawn_of_worker in worker.children(recursive=True):
                if with_signal:
                    log_dispatcher_message(
                        self,
                        message=f"Attempting to terminate child process with PID [{spawn_of_worker.pid}] and name "
                        f"[{spawn_of_worker.name}] using signal [{with_signal}]",
                        is_warning=True,
                    )
                    try:
                        spawn_of_worker.send_signal(with_signal)
                    except ps.NoSuchProcess:
                        pass
                else:
                    log_dispatcher_message(
                        self,
                        message=f"Attempting to kill child process with PID [{spawn_of_worker.pid}] and name "
                        f"[{spawn_of_worker.name}]",
                        is_warning=True,
                    )
                    try:
                        spawn_of_worker.kill()
                    except ps.NoSuchProcess:
                        pass
        if not just_kill_descendants:
            if with_signal:
                # Ensure the worker exits as would have occurred if not handling its signals
                signal.signal(with_signal, signal.SIG_DFL)
                worker.send_signal(with_signal)
            else:
                try:
                    worker.kill()
                except ps.NoSuchProcess:
                    pass