localstack/localstack

View on GitHub
bin/localstack-supervisor

Summary

Maintainability
Test Coverage
#!/usr/bin/env python3
"""Supervisor script for managing localstack processes, acting like a mini init system tailored to
localstack. This can be used on the host or in the docker-entrypoint.sh.

The supervisor behaves as follows:
* SIGUSR1 to supervisor will terminate the localstack instance and then start a new process
* SIGTERM to supervisor will terminate the localstack instance and then return
* if the localstack instance exits, then the supervisor exits with the same exit code.

The methods ``waitpid_reap_other_children`` and ``stop_child_process`` were adapted from baseimage-docker
licensed under MIT: https://github.com/phusion/baseimage-docker/blob/rel-0.9.16/image/bin/my_init"""
import errno
import os
import signal
import subprocess
import sys
import threading
from typing import Optional

LOCALSTACK_COMMAND = [sys.executable, "-m", "localstack.runtime.main"]
"""This command is used to run the localstack process"""

DEBUG = os.getenv("DEBUG", "").strip().lower() in ["1", "true"]

# configurable process shutdown timeout, to allow for longer shutdown procedures
DEFAULT_SHUTDOWN_TIMEOUT = int(os.getenv("SHUTDOWN_TIMEOUT", "").strip() or 5)


class AlarmException(Exception):
    """Special exception raise if SIGALRM is received."""

    pass


def log(message: str):
    """Prints the given message to stdout with a logging prefix."""
    if not DEBUG:
        return
    print(f"LocalStack supervisor: {message}")


_terminated_child_processes = {}


def waitpid_reap_other_children(pid: int) -> Optional[int]:
    """
    Waits for the child process with the given PID, while at the same time reaping any other child
    processes that have exited (e.g. adopted child processes that have terminated).

    :param pid: the pid of the process
    :returns: the status of the process
    """
    global _terminated_child_processes

    status = _terminated_child_processes.get(pid)
    if status:
        # A previous call to waitpid_reap_other_children(),
        # with an argument not equal to the current argument,
        # already waited for this process. Return the status
        # that was obtained back then.
        del _terminated_child_processes[pid]
        return status

    done = False
    status = None
    while not done:
        try:
            this_pid, status = os.waitpid(-1, 0)

            if this_pid == pid:
                done = True
            else:
                # Save status for later.
                _terminated_child_processes[this_pid] = status
        except OSError as e:
            if e.errno == errno.ECHILD or e.errno == errno.ESRCH:
                return None
            else:
                raise
    return status


def stop_child_process(name: str, pid: int, sig: int = signal.SIGTERM, timeout: int | None = None):
    """
    Sends a signal to the given process and then waits for all child processes to avoid zombie processes.

    :param name: readable process name to log
    :param pid: the pid to terminate
    :param sig: the signal to send to the process
    :param timeout: the wait timeout
    :return:
    """
    log(f"Shutting down {name} (PID {pid})...")
    try:
        os.kill(pid, sig)
    except OSError:
        pass
    timeout = timeout or DEFAULT_SHUTDOWN_TIMEOUT
    signal.alarm(timeout)
    try:
        waitpid_reap_other_children(pid)
    except OSError:
        pass
    except AlarmException:
        log(f"{name} (PID {pid}) did not shut down in time. Forcing it to exit.")
        try:
            os.kill(pid, signal.SIGKILL)
        except OSError:
            pass
        try:
            waitpid_reap_other_children(pid)
        except OSError:
            pass
    finally:
        signal.alarm(0)


def main():
    # the localstack process
    process: Optional[subprocess.Popen] = None

    # signal handlers set these events which further determine which actions should be taken in the main loop
    should_restart = threading.Event()

    # signal handlers

    def _raise_alarm_exception(signum, frame):
        raise AlarmException()

    def _terminate_localstack(signum, frame):
        if not process:
            return
        stop_child_process("localstack", process.pid, signal.SIGTERM)

    def _restart_localstack(signum, frame):
        # this handler terminates localstack but leaves the supervisor in a state to restart it
        if not process:
            return
        should_restart.set()
        stop_child_process("localstack", process.pid, signal.SIGTERM)

    signal.signal(signal.SIGALRM, _raise_alarm_exception)
    signal.signal(signal.SIGTERM, _terminate_localstack)
    # TODO investigate: when we tried to forward SIGINT to LS, for some reason SIGINT was raised twice in LS
    #  yet setting this to a no-op also worked. since we couldn't really figure out what was going on, we just
    #  translate SIGINT to SIGTERM for the localstack process.
    signal.signal(signal.SIGINT, _terminate_localstack)
    signal.signal(signal.SIGUSR1, _restart_localstack)

    # sets the supervisor PID so localstack can signal to it more easily
    os.environ["SUPERVISOR_PID"] = str(os.getpid())

    exit_code = 0
    try:
        log("starting")
        while True:
            # clear force event indicators
            should_restart.clear()

            # start a new localstack process
            process = subprocess.Popen(
                LOCALSTACK_COMMAND,
                stdout=sys.stdout,
                stderr=subprocess.STDOUT,
            )
            log(f"localstack process (PID {process.pid}) starting")

            # wait for the localstack process to return
            exit_code = process.wait()
            log(f"localstack process (PID {process.pid}) returned with exit code {exit_code}")

            # make sure that, if the localstack process terminates on its own accord, that we still reap all
            # child processes
            waitpid_reap_other_children(process.pid)

            if should_restart.is_set():
                continue
            else:
                break
    finally:
        log("exiting")

    sys.exit(exit_code)


if __name__ == "__main__":
    main()