iterative/dvc

View on GitHub
dvc/daemon.py

Summary

Maintainability
A
0 mins
Test Coverage
"""Launch `dvc daemon` command in a separate detached process."""

import inspect
import logging
import os
import subprocess
import sys
from collections.abc import Mapping, Sequence
from contextlib import nullcontext
from typing import TYPE_CHECKING, Any, Optional, Union

from dvc.log import logger

if TYPE_CHECKING:
    from contextlib import AbstractContextManager

from dvc.env import DVC_DAEMON, DVC_DAEMON_LOGFILE
from dvc.utils import fix_env, is_binary
from dvc.utils.collections import ensure_list

logger = logger.getChild(__name__)


def _suppress_resource_warning(popen: subprocess.Popen) -> None:
    """Sets the returncode to avoid ResourceWarning when popen is garbage collected."""
    # only use for daemon processes.
    # See https://bugs.python.org/issue38890.
    popen.returncode = 0


def _win_detached_subprocess(args: Sequence[str], **kwargs) -> int:
    assert os.name == "nt"

    from subprocess import (  # type: ignore[attr-defined]
        CREATE_NEW_PROCESS_GROUP,
        CREATE_NO_WINDOW,
        STARTF_USESHOWWINDOW,
        STARTUPINFO,
    )

    # https://stackoverflow.com/a/7006424
    # https://bugs.python.org/issue41619
    creationflags = CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW

    startupinfo = STARTUPINFO()
    startupinfo.dwFlags |= STARTF_USESHOWWINDOW
    popen = subprocess.Popen(
        args,
        close_fds=True,
        shell=False,  # noqa: S603
        startupinfo=startupinfo,
        creationflags=creationflags,
        **kwargs,
    )
    _suppress_resource_warning(popen)
    return popen.pid


def _get_dvc_args() -> list[str]:
    args = [sys.executable]
    if not is_binary():
        root_dir = os.path.abspath(os.path.dirname(__file__))
        main_entrypoint = os.path.join(root_dir, "__main__.py")
        args.append(main_entrypoint)
    return args


def _fork_process() -> int:
    assert os.name == "posix"

    # NOTE: using os._exit instead of sys.exit, because dvc built
    # with PyInstaller has trouble with SystemExit exception and throws
    # errors such as "[26338] Failed to execute script __main__"
    try:
        pid = os.fork()  # type: ignore[attr-defined]
        if pid > 0:
            return pid
    except OSError:
        logger.exception("failed at first fork")
        os._exit(1)

    os.setsid()  # type: ignore[attr-defined]

    try:
        pid = os.fork()  # type: ignore[attr-defined]
        if pid > 0:
            os._exit(0)
    except OSError:
        logger.exception("failed at second fork")
        os._exit(1)

    # disconnect from the terminal
    fd = os.open(os.devnull, os.O_RDWR)
    for fd2 in range(3):
        os.dup2(fd, fd2)
    os.close(fd)
    return pid


def _posix_detached_subprocess(args: Sequence[str], **kwargs) -> int:
    # double fork and execute a subprocess so that there are no zombies
    read_end, write_end = os.pipe()
    pid = _fork_process()
    if pid > 0:  # in parent
        os.close(write_end)
        pid_str = os.read(read_end, 32).decode("utf8")
        os.close(read_end)
        return int(pid_str)

    proc = subprocess.Popen(args, shell=False, close_fds=True, **kwargs)  # noqa: S603
    os.close(read_end)
    os.write(write_end, str(proc.pid).encode("utf8"))
    os.close(write_end)

    exit_code = proc.wait()
    os._exit(exit_code)


def _detached_subprocess(args: Sequence[str], **kwargs) -> int:
    """Run in a detached subprocess."""
    kwargs.setdefault("stdin", subprocess.DEVNULL)
    kwargs.setdefault("stdout", subprocess.DEVNULL)
    kwargs.setdefault("stderr", subprocess.DEVNULL)

    if os.name == "nt":
        return _win_detached_subprocess(args, **kwargs)
    return _posix_detached_subprocess(args, **kwargs)


def _map_log_level_to_flag() -> Optional[str]:
    flags = {logging.DEBUG: "-v", logging.TRACE: "-vv"}  # type: ignore[attr-defined]
    return flags.get(logger.getEffectiveLevel())


def daemon(args: list[str]) -> None:
    """Launch a `dvc daemon` command in a detached process.

    Args:
        args (list): list of arguments to append to `dvc daemon` command.
    """
    if flag := _map_log_level_to_flag():
        args = [*args, flag]
    daemonize(["daemon", *args])


def _spawn(
    args: list[str],
    executable: Optional[Union[str, list[str]]] = None,
    env: Optional[Mapping[str, str]] = None,
    output_file: Optional[str] = None,
) -> int:
    file: "AbstractContextManager[Any]" = nullcontext()
    kwargs = {}
    if output_file:
        file = open(output_file, "ab")  # noqa: SIM115
        kwargs = {"stdout": file, "stderr": file}

    if executable is None:
        executable = _get_dvc_args()
    else:
        executable = ensure_list(executable)

    with file:
        return _detached_subprocess(executable + args, env=env, **kwargs)


def daemonize(args: list[str], executable: Union[None, str, list[str]] = None) -> None:
    if os.name not in ("posix", "nt"):
        return

    if os.environ.get(DVC_DAEMON):
        logger.debug("skipping launching a new daemon.")
        return

    env = fix_env()
    env[DVC_DAEMON] = "1"
    if not is_binary():
        file_path = os.path.abspath(inspect.stack()[0][1])
        env["PYTHONPATH"] = os.path.dirname(os.path.dirname(file_path))

    logger.debug("Trying to spawn %r", args)
    pid = _spawn(args, executable, env, output_file=env.get(DVC_DAEMON_LOGFILE))
    logger.debug("Spawned %r with pid %s", args, pid)