avocado-framework/avocado

View on GitHub
avocado/utils/service.py

Summary

Maintainability
D
2 days
Test Coverage
B
83%
#  Copyright(c) 2013 Intel Corporation.
#
#  This program is free software; you can redistribute it and/or modify it
#  under the terms and conditions of the GNU General Public License,
#  version 2, as published by the Free Software Foundation.
#
#  This program is distributed in the hope it will be useful, but WITHOUT
#  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
#  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
#  more details.
#
#  You should have received a copy of the GNU General Public License along with
#  this program; if not, write to the Free Software Foundation, Inc.,
#  51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
#
#  The full GNU General Public License is included in this distribution in
#  the file called "COPYING".
#
# This code was inspired in the autotest project,
#
# client/shared/service.py
# Original author: Ross Brattain <ross.b.brattain@intel.com>

import logging
import os
import re
from tempfile import mkstemp

from avocado.utils import process

LOG = logging.getLogger(__name__)

_COMMAND_TABLE_DOC = """
Taken from http://fedoraproject.org/wiki/SysVinit_to_Systemd_Cheatsheet

service frobozz start
systemctl start frobozz.service
 Used to start a service (not reboot persistent)

service frobozz stop
systemctl stop frobozz.service
 Used to stop a service (not reboot persistent)

service frobozz restart
systemctl restart frobozz.service
 Used to stop and then start a service

service frobozz reload
systemctl reload frobozz.service
 When supported, reloads the config file without interrupting pending
 operations.

service frobozz condrestart
systemctl condrestart frobozz.service
 Restarts if the service is already running.

service frobozz status
systemctl status frobozz.service
 Tells whether a service is currently running.

ls /etc/rc.d/init.d/
systemctl list-unit-files --type=service (preferred)
 Used to list the services that can be started or stopped
ls /lib/systemd/system/*.service /etc/systemd/system/*.service
 Used to list all the services and other units

chkconfig frobozz on
systemctl enable frobozz.service
 Turn the service on, for start at next boot, or other trigger.

chkconfig frobozz off
systemctl disable frobozz.service
 Turn the service off for the next reboot, or any other trigger.

chkconfig frobozz
systemctl is-enabled frobozz.service
 Used to check whether a service is configured to start or not in the current
 environment.

chkconfig --list
systemctl list-unit-files --type=service(preferred)
ls /etc/systemd/system/*.wants/
 Print a table of services that lists which runlevels each is configured on
 or off

chkconfig frobozz --list
ls /etc/systemd/system/*.wants/frobozz.service
 Used to list what levels this service is configured on or off

chkconfig frobozz --add
systemctl daemon-reload
 Used when you create a new service file or modify any configuration
"""


def sys_v_init_result_parser(command):
    """
    Parse results from sys_v style commands.

    command status: return true if service is running.
    command is_enabled: return true if service is enabled.
    command list: return a dict from service name to status.
    command others: return true if operate success.

    :param command: command.
    :type command: str.
    :return: different from the command.
    """
    if command == "status":

        def method(cmd_result):
            """
            Parse method for `service $name status`.

            Returns True if running.
            Returns False if stopped.
            Returns None if unrecognized.
            """
            # If service is stopped, exit_status is also not zero.
            # So, we can't use exit_status to check result.
            # Returns None if unrecognized.
            if re.search(b"unrecognized", cmd_result.stderr.lower()):
                return None
            # Returns False if stopped.
            output = cmd_result.stdout.lower()
            dead_flags = [b"stopped", b"not running", b"dead"]
            for flag in dead_flags:
                if re.search(flag, output):
                    return False
            # If output does not contain a dead flag, check it with "running".
            return bool(re.search(b"running", output))

        return method
    elif command == "list":

        def method(cmd_result):
            """
            Parse method for `service $name list`.

            Return dict from service name to status.

            >>> {"sshd": {0: 'off', 1: 'off', 2: 'off', 3: 'off', 4: 'off',
            >>>           5: 'off', 6: 'off'},
            >>>  "vsftpd": {0: 'off', 1: 'off', 2: 'off', 3: 'off', 4: 'off',
            >>>             5: 'off', 6: 'off'},
            >>>  "xinetd": {'discard-dgram:': 'off',
            >>>             'rsync:': 'off'...'chargen-stream:': 'off'},
            >>>  ...
            >>>  }
            """
            if cmd_result.exit_status:
                raise process.CmdError(cmd_result.command, cmd_result)
            # The final dict to return.
            services_statuses_dict = {}
            # Dict to store status on every target for each service.
            status_per_target_dict = {}
            # Dict to store the status for service based on xinetd.
            xinet_services_dict = {}
            lines = cmd_result.stdout_text.strip().splitlines()
            for line in lines:
                sublines = line.strip().split()
                if len(sublines) == 8:
                    # Service and status on each target.
                    service_name = sublines[0]
                    # Store the status of each target in status_per_target_dict
                    for target in range(7):
                        status = sublines[target + 1].split(":")[-1]
                        status_per_target_dict[target] = status
                    services_statuses_dict[service_name] = status_per_target_dict.copy()

                elif len(sublines) == 2:
                    # Service based on xinetd.
                    service_name = sublines[0].strip(":")
                    status = sublines[-1]
                    xinet_services_dict[service_name] = status

                else:
                    # Header or some lines useless.
                    continue
            # Add xinetd based service in the main dict.
            services_statuses_dict["xinetd"] = xinet_services_dict
            return services_statuses_dict

        return method
    return _ServiceResultParser.default_method


def systemd_result_parser(command):
    """
    Parse results from systemd style commands.

    command status: return true if service is running.
    command is_enabled: return true if service is enabled.
    command list: return a dict from service name to status.
    command others: return true if operate success.

    :param command: command.
    :type command: str.
    :return: different from the command.
    """
    if command == "status":

        def method(cmd_result):
            """
            Parse method for `systemctl status $name.service`.

            Returns True if running.
            Returns False if stopped.
            Returns None if not loaded.
            """
            # If service is stopped, exit_status is also not zero.
            # So, we can't use exit_status to check result.
            output = cmd_result.stdout
            # Returns None if not loaded.
            if not re.search(b"Loaded: loaded", output):
                return None
            # Check it with Active status.
            return output.count(b"Active: active") > 0

        return method
    elif command == "list":

        def method(cmd_result):
            """
            Parse method for `systemctl list $name.service`.

            Return a dict from service name to status.

            e.g:
                {"sshd": "enabled",
                 "vsftpd": "disabled",
                 "systemd-sysctl": "static",
                 ...
                 }
            """
            if cmd_result.exit_status:
                raise process.CmdError(cmd_result.command, cmd_result)
            # Dict to store service name to status.
            _service2status_dict = {}
            lines = cmd_result.stdout_text.strip().splitlines()
            for line in lines:
                sublines = line.strip().split()
                if (not len(sublines) == 2) or (not sublines[0].endswith("service")):
                    # Some lines useless.
                    continue
                service_name = sublines[0].rstrip(".service")
                status = sublines[-1]
                _service2status_dict[service_name] = status
            return _service2status_dict

        return method
    return _ServiceResultParser.default_method


def sys_v_init_command_generator(command):
    """
    Generate lists of command arguments for sys_v style inits.

    :param command: start,stop,restart, etc.
    :type command: str
    :return: list of commands to pass to process.run or similar function
    :rtype: builtin.list
    """
    command_name = "service"
    if command == "is_enabled":
        command_name = "chkconfig"
        command = ""
    elif command == "enable":
        command_name = "chkconfig"
        command = "on"
    elif command == "disable":
        command_name = "chkconfig"
        command = "off"
    elif command == "list":
        # noinspection PyUnusedLocal
        def _list_command(_):
            return ["chkconfig", "--list"]

        return _list_command
    elif command == "set_target":

        def _set_target_command(target):
            target = convert_systemd_target_to_runlevel(target)
            return ["telinit", target]

        return _set_target_command
    # Do not need reset failed, mask and unmask in sys_v style system.
    elif command in ["reset_failed", "mask", "unmask"]:

        def _true_command(_):
            return ["true"]

        return _true_command

    def _method(service_name):
        return [command_name, service_name, command]

    return _method


def systemd_command_generator(command):
    """
    Generate list of command line argument strings for systemctl.

    One argument per string for compatibility Popen

    WARNING: If systemctl detects that it is running on a tty it will use
    color, pipe to $PAGER, change column sizes and not truncate unit names.
    Use --no-pager to suppress pager output, or set PAGER=cat in the
    environment. You may need to take other steps to suppress color output.
    See https://bugzilla.redhat.com/show_bug.cgi?id=713567

    :param command: start,stop,restart, etc.
    :type command: str
    :return: List of command and arguments to pass to process.run or similar
            functions
    :rtype: builtin.list
    """
    command_name = "systemctl"
    if command == "is_enabled":
        command = "is-enabled"
    elif command == "list":
        # noinspection PyUnusedLocal
        def _list_command(_):
            # systemctl pipes to `less` or $PAGER by default. Workaround this
            # add '--full' to avoid systemctl truncates service names.
            return [
                command_name,
                "list-unit-files",
                "--type=service",
                "--no-pager",
                "--full",
            ]

        return _list_command
    elif command == "set_target":

        def _set_target_command(target):
            return [command_name, "isolate", target]

        return _set_target_command
    elif command == "reset_failed":
        command = "reset-failed"

    def _method(service_name):
        return [command_name, command, f"{service_name}.service"]

    return _method


# mapping command/whether it requires root
COMMANDS = (
    ("start", True),
    ("stop", True),
    ("reload", True),
    ("restart", True),
    ("condrestart", True),
    ("status", False),
    ("enable", True),
    ("disable", True),
    ("is_enabled", False),
    ("list", False),
    ("set_target", True),
    ("reset_failed", True),
    ("mask", True),
    ("unmask", True),
)


class _ServiceResultParser:  # pylint: disable=too-few-public-methods
    """
    A class that contains staticmethods to parse the result of service command.
    """

    def __init__(self, result_parser, command_list=COMMANDS):
        """
        Create staticmethods for each command in command_list using setattr.

        :param result_parser: function that generates functions that parse the
                result of command.
        :type result_parser: function
        :param command_list: list of all the commands, e.g. start, stop,
                restart, etc.
        :type command_list: list
        """
        self.commands = command_list
        for command, _ in self.commands:
            setattr(self, command, result_parser(command))

    @staticmethod
    def default_method(cmd_result):
        """
        Default method to parse result from command which is not 'list'/'status'

        Returns True if command was executed successfully.
        """
        if cmd_result.exit_status:
            LOG.debug(cmd_result)
            return False
        return True


class _ServiceCommandGenerator:  # pylint: disable=too-few-public-methods
    """
    Generate command lists for starting/stopping services.
    """

    def __init__(self, command_generator, command_list=COMMANDS):
        """
        Create staticmethods for each command in command_list.

        :param command_generator: function that generates functions that
                generate lists of command strings
        :type command_generator: function
        :param command_list: list of all the commands, e.g. start, stop,
                restart, etc.
        :type command_list: list
        """
        self.commands = command_list
        for command, _ in self.commands:
            setattr(self, command, command_generator(command))


def get_name_of_init(run=process.run):
    """
    Internal function to determine what executable is PID 1

    It does that by checking /proc/1/exe.
    Fall back to checking /proc/1/cmdline (local execution).

    :return: executable name for PID 1, aka init
    :rtype:  str
    """
    if run is process.run:
        # On a local run, there are better ways to check
        # our PID 1 executable name.
        try:
            return os.path.basename(os.readlink("/proc/1/exe"))
        except OSError:
            with open("/proc/1/cmdline", "r") as cmdline:  # pylint: disable=W1514
                init = cmdline.read().split(chr(0))[0]
                try:
                    init = os.readlink(init)
                except OSError:
                    pass
                return os.path.basename(init)
    else:
        output = run("readlink /proc/1/exe").stdout.strip()
        if isinstance(output, bytes):
            output = output.decode(encoding="utf-8")
        return os.path.basename(output)


class _SpecificServiceManager:  # pylint: disable=too-few-public-methods
    def __init__(
        self,
        service_name,
        service_command_generator,
        service_result_parser,
        run=process.run,
    ):
        """
        Create staticmethods that call process.run with the given service_name

        >>> my_generator = auto_create_specific_service_command_generator
        >>> lldpad = SpecificServiceManager("lldpad", my_generator())
        >>> lldpad.start()
        >>> lldpad.stop()

        :param service_name: init service name or systemd unit name
        :type service_name: str
        :param service_command_generator: a sys_v_init or systemd command
                generator
        :type service_command_generator: _ServiceCommandGenerator
        :param run: function that executes the commands and return CmdResult
                object, default process.run
        :type run: function
        """
        for cmd, requires_root in service_command_generator.commands:
            run_func = run
            parse_func = getattr(service_result_parser, cmd)
            command = getattr(service_command_generator, cmd)
            setattr(
                self,
                cmd,
                self.generate_run_function(
                    run_func=run_func,
                    parse_func=parse_func,
                    command=command,
                    service_name=service_name,
                    requires_root=requires_root,
                ),
            )

    @staticmethod
    def generate_run_function(
        run_func, parse_func, command, service_name, requires_root
    ):
        """
        Generate the wrapped call to process.run for the given service_name.

        :param run_func:  function to execute command and return CmdResult
                object.
        :type run_func:  function
        :param parse_func: function to parse the result from run.
        :type parse_func: function
        :param command: partial function that generates the command list
        :type command: function
        :param service_name: init service name or systemd unit name
        :type service_name: str
        :param requires_root: whether command needs superuser privileges
        :type requires_root: bool
        :return: wrapped process.run function.
        :rtype: function
        """

        def run(**kwargs):
            """
            Wrapped process.run invocation that will start/stop/etc a service.

            :param kwargs: extra arguments to process.run, .e.g. timeout.
                    But not for ignore_status.
                    We need a CmdResult to parse and raise an
                    exception.TestError if command failed.
                    We will not let the CmdError out.
            :return: result of parse_func.
            """
            # If run_func is process.run by default, we need to set
            # ignore_status = True. Otherwise, skip this setting.
            if run_func is process.run:
                LOG.debug("Setting ignore_status to True.")
                kwargs["ignore_status"] = True
                if requires_root:
                    LOG.debug("Setting sudo to True.")
                    kwargs["sudo"] = True
            cmd = " ".join(command(service_name))
            result = run_func(cmd, **kwargs)
            return parse_func(result)

        return run


class _GenericServiceManager:  # pylint: disable=too-few-public-methods
    """
    Base class for SysVInitServiceManager and SystemdServiceManager.
    """

    def __init__(
        self, service_command_generator, service_result_parser, run=process.run
    ):
        """
        Create staticmethods for each service command, e.g. start, stop

        These staticmethods take as an argument the service to be started or
        stopped.

        >>> my_generator = auto_create_specific_service_command_generator
        >>> systemd = SpecificServiceManager(my_generator())
        >>> systemd.start("lldpad")
        >>> systemd.stop("lldpad")

        :param service_command_generator: a sys_v_init or systemd command
                generator
        :type service_command_generator: _ServiceCommandGenerator
        :param run: function to call the run the commands, default process.run
        :type run: function
        """
        for cmd, requires_root in service_command_generator.commands:
            parse_func = getattr(service_result_parser, cmd)
            command = getattr(service_command_generator, cmd)
            setattr(
                self,
                cmd,
                self.generate_run_function(
                    run_func=run,
                    parse_func=parse_func,
                    command=command,
                    requires_root=requires_root,
                ),
            )

    @staticmethod
    def generate_run_function(run_func, parse_func, command, requires_root):
        """
        Generate the wrapped call to process.run for the service command.

        :param run_func:  process.run
        :type run_func:  function
        :param command: partial function that generates the command list
        :type command: function
        :param requires_root: whether command needs superuser privileges
        :type requires_root: bool
        :return: wrapped process.run function.
        :rtype: function
        """

        def run(service="", **kwargs):
            """
            Wrapped process.run invocation that will start/stop/etc. a service.

            :param service: service name, e.g. crond, dbus, etc.
            :param kwargs: extra arguments to process.run, .e.g. timeout.
                    But not for ignore_status. We need a CmdResult to parse
                    and raise a exception.TestError if command failed.
                    We will not let the CmdError out.
            :return: result of parse_func.
            """
            # If run_func is process.run by default, we need to set
            # ignore_status = True. Otherwise, skip this setting.
            if run_func is process.run:
                LOG.debug("Setting ignore_status to True.")
                kwargs["ignore_status"] = True
                if requires_root:
                    LOG.debug("Setting sudo to True.")
                    kwargs["sudo"] = True
            cmd = " ".join(command(service))
            result = run_func(cmd, **kwargs)
            return parse_func(result)

        return run


class _SysVInitServiceManager(
    _GenericServiceManager
):  # pylint: disable=too-few-public-methods
    """
    Concrete class that implements the SysVInitServiceManager
    """


def convert_sysv_runlevel(level):
    """
    Convert runlevel to systemd target.

    :param level: sys_v runlevel
    :type level: str or int
    :return: systemd target
    :rtype: str
    :raise ValueError: when runlevel is unknown
    """
    runlevel = str(level)
    if runlevel == "0":
        target = "poweroff.target"
    elif runlevel in ["1", "s", "single"]:
        target = "rescue.target"
    elif runlevel in ["2", "3", "4"]:
        target = "multi-user.target"
    elif runlevel == "5":
        target = "graphical.target"
    elif runlevel == "6":
        target = "reboot.target"
    else:
        raise ValueError(f"unknown runlevel {level}")
    return target


def convert_systemd_target_to_runlevel(target):
    """
    Convert systemd target to runlevel.

    :param target: systemd target
    :type target: str
    :return: sys_v runlevel
    :rtype: str
    :raise ValueError: when systemd target is unknown
    """
    if target == "poweroff.target":
        runlevel = "0"
    elif target == "rescue.target":
        runlevel = "s"
    elif target == "multi-user.target":
        runlevel = "3"
    elif target == "graphical.target":
        runlevel = "5"
    elif target == "reboot.target":
        runlevel = "6"
    else:
        raise ValueError(f"unknown target {target}")
    return runlevel


class _SystemdServiceManager(_GenericServiceManager):
    """
    Concrete class that implements the SystemdServiceManager
    """

    @staticmethod
    def change_default_runlevel(runlevel="multi-user.target"):
        # atomic symlinking, symlink and then rename
        """
        Set the default systemd target.
        Create the symlink in a temp directory and then use
        atomic rename to move the symlink into place.

        :param runlevel: default systemd target
        :type runlevel: str
        """
        tmp_symlink = mkstemp(dir="/etc/systemd/system")
        os.symlink(f"/usr/lib/systemd/system/{runlevel}", tmp_symlink)
        os.rename(tmp_symlink, "/etc/systemd/system/default.target")


_COMMAND_GENERATORS = {
    "init": sys_v_init_command_generator,
    "systemd": systemd_command_generator,
}

_RESULT_PARSERS = {"init": sys_v_init_result_parser, "systemd": systemd_result_parser}

_SERVICE_MANAGERS = {"init": _SysVInitServiceManager, "systemd": _SystemdServiceManager}


def service_manager(run=process.run):
    """
    Detect which init program is being used, init or systemd and return a
    class has methods to start/stop services.

    Example of use:

    .. code-block:: python

      # Get the system service manager
      service_manager = ServiceManager()

      # Stating service/unit "sshd"
      service_manager.start("sshd")

      # Getting a list of available units
      units = service_manager.list()

      # Disabling and stopping a list of services
      services_to_disable = ['ntpd', 'httpd']

      for s in services_to_disable:
          service_manager.disable(s)
          service_manager.stop(s)

    :return: SysVInitServiceManager or SystemdServiceManager
    :rtype: _GenericServiceManager
    """
    init = get_name_of_init(run)
    internal_service_manager = _SERVICE_MANAGERS[init]
    internal_command_generator = _COMMAND_GENERATORS[init]
    internal_result_parser = _RESULT_PARSERS[init]

    internal_generator = _ServiceCommandGenerator(internal_command_generator)
    internal_parser = _ServiceResultParser(internal_result_parser)
    _service_manager = internal_service_manager(
        internal_generator, internal_parser, run=run
    )
    return _service_manager


ServiceManager = service_manager


def _auto_create_specific_service_result_parser(run=process.run):
    """
    Create a class that will create partial functions that generate
    result_parser for the current init command.

    :return: A ServiceResultParser for the auto-detected init command.
    :rtype: _ServiceResultParser
    """
    result_parser = _RESULT_PARSERS[get_name_of_init(run)]
    # remove list method
    command_list = [(c, r) for (c, r) in COMMANDS if c not in ["list", "set_target"]]
    return _ServiceResultParser(result_parser, command_list)


def _auto_create_specific_service_command_generator(run=process.run):
    """
    Create a class that will create partial functions that generate commands
    for the current init command.

    >>> my_generator = auto_create_specific_service_command_generator
    >>> lldpad = SpecificServiceManager("lldpad", my_generator())
    >>> lldpad.start()
    >>> lldpad.stop()

    :return: A ServiceCommandGenerator for the auto-detected init command.
    :rtype: _ServiceCommandGenerator
    """
    command_generator = _COMMAND_GENERATORS[get_name_of_init(run)]
    # remove list method
    command_list = [(c, r) for (c, r) in COMMANDS if c not in ["list", "set_target"]]
    return _ServiceCommandGenerator(command_generator, command_list)


def specific_service_manager(service_name, run=process.run):
    """Get the service manager for a specific service.

    Example of use:

    .. code-block:: python

      # Get the specific service manager for sshd
      sshd = SpecificServiceManager("sshd")
      sshd.start()
      sshd.stop()
      sshd.reload()
      sshd.restart()
      sshd.condrestart()
      sshd.status()
      sshd.enable()
      sshd.disable()
      sshd.is_enabled()

    :param service_name: systemd unit or init.d service to manager
    :type service_name: str
    :return: SpecificServiceManager that has start/stop methods
    :rtype: _SpecificServiceManager
    """
    init = get_name_of_init(run)
    result_parser = _RESULT_PARSERS[init]
    specific_generator = _auto_create_specific_service_command_generator
    return _SpecificServiceManager(
        service_name, specific_generator(run), _ServiceResultParser(result_parser), run
    )


SpecificServiceManager = specific_service_manager