avocado-framework/avocado

View on GitHub
avocado/utils/software_manager/backends/rpm.py

Summary

Maintainability
B
5 hrs
Test Coverage
F
25%
import logging
import os
import re

from avocado.utils import path as utils_path
from avocado.utils import process
from avocado.utils.software_manager.backends.base import BaseBackend

LOG = logging.getLogger(__name__)


class RpmBackend(BaseBackend):
    """
    This class implements operations executed with the rpm package manager.

    rpm is a lower level package manager, used by higher level managers such
    as yum and zypper.
    """

    PACKAGE_TYPE = "rpm"
    SOFTWARE_COMPONENT_QRY = (
        PACKAGE_TYPE + " " + "%{NAME} %{VERSION} %{RELEASE} %{SIGMD5} %{ARCH}"
    )

    def __init__(self):
        self.lowlevel_base_cmd = utils_path.find_command("rpm")

    def _check_installed_version(self, name, version):
        """
        Helper for the check_installed public method.

        :param name: Package name.
        :param version: Package version.
        """
        cmd = self.lowlevel_base_cmd + " -q --qf %{VERSION} " + name
        inst_version = process.run(cmd, ignore_status=True).stdout_text

        if "not installed" in inst_version:
            return False

        return bool(inst_version >= version)

    def check_installed(self, name, version=None, arch=None):
        """
        Check if package [name] is installed.

        :param name: Package name.
        :param version: Package version.
        :param arch: Package architecture.
        """
        if arch:
            cmd = self.lowlevel_base_cmd + " -q --qf %{ARCH} " + name
            inst_archs = process.system_output(cmd, ignore_status=True)
            inst_archs = inst_archs.split("\n")

            for inst_arch in inst_archs:
                if inst_arch == arch:
                    return self._check_installed_version(name, version)
            return False

        elif version:
            return self._check_installed_version(name, version)
        else:
            cmd = "rpm -q " + name
            try:
                process.system(cmd)
                return True
            except process.CmdError:
                return False

    def list_all(self, software_components=True):
        """
        List all installed packages.

        :param software_components: log in a format suitable for the
                                    SoftwareComponent schema
        """
        LOG.debug("Listing all system packages (may take a while)")

        if software_components:
            cmd_format = "rpm -qa --qf '%s' | sort"
            query_format = f"{self.SOFTWARE_COMPONENT_QRY}\n"
            cmd_format %= query_format
            cmd_result = process.run(cmd_format, verbose=False, shell=True)
        else:
            cmd_result = process.run("rpm -qa | sort", verbose=False, shell=True)

        out = cmd_result.stdout_text.strip()
        installed_packages = out.splitlines()
        return installed_packages

    @staticmethod
    def list_files(name):
        """
        List files installed on the system by package [name].

        :param name: Package name.
        """
        path = os.path.abspath(name)
        if os.path.isfile(path):
            option = "-qlp"
            name = path
        else:
            option = "-ql"

        l_cmd = "rpm" + " " + option + " " + name

        try:
            result = process.system_output(l_cmd)
            list_files = result.split("\n")
            return list_files
        except process.CmdError:
            return []

    @staticmethod
    def rpm_install(file_path, no_dependencies=False, replace=False):
        """
        Install the rpm file [file_path] provided.

        :param str file_path: file path of the installed package
        :param bool no_dependencies: whether to add "nodeps" flag
        :param bool replace: whether to replace existing package
        :returns: whether file is installed properly
        :rtype: bool
        """
        if not os.path.isfile(file_path):
            LOG.warning("Please provide proper rpm path")
            return False

        nodeps = "--nodeps " if no_dependencies else ""
        update = "-U" if replace else "-i"
        cmd = f"rpm {update} {nodeps}{file_path}"

        try:
            process.system(cmd)
            return True
        except process.CmdError as details:
            LOG.error(details)
            return False

    @staticmethod
    def rpm_verify(package_name):
        """
        Verify an RPM package with an installed one.

        :param str package_name: name of the verified package
        :returns: whether the verification was successful
        :rtype: bool
        """
        LOG.info("Verifying package information.")
        cmd = "rpm -V " + package_name
        result = process.run(cmd, ignore_status=True)

        # unstable approach but currently works
        # installed_pattern = r"\s" + package_name + r" is installed\s+"
        # match = re.search(installed_pattern, result)
        match = result.exit_status == 0
        if match:
            LOG.info("Verification successful.")
            return True
        else:
            LOG.info(result.stdout_text.rstrip())
            return False

    @staticmethod
    def rpm_erase(package_name):
        """
        Erase an RPM package.

        :param str package_name: name of the erased package
        :returns: whether file is erased properly
        :rtype: bool
        """
        LOG.warning("Erasing rpm package %s", package_name)
        cmd = "rpm -e " + package_name
        result = process.run(cmd, ignore_status=True)
        if result.exit_status:
            return False
        return True

    @staticmethod
    def prepare_source(spec_file, dest_path=None, build_option=None):
        """
        Rpmbuild the spec path and return build dir

        :param spec_path: spec path to install
        :param  build_option: rpmbuild option
        :return path: build directory
        """

        if build_option is None:
            build_option = "-bc"
        if dest_path is not None:
            build_option += f" --define '_builddir {dest_path}'"
        else:
            LOG.error("Please provide a valid path")
            return ""
        try:
            process.system(f"rpmbuild {build_option} {spec_file}")
            return os.path.join(dest_path, os.listdir(dest_path)[0])
        except process.CmdError as details:
            LOG.error(details)
            return ""

    def find_rpm_packages(self, rpm_dir):
        """
        Extract product dependencies from a defined RPM directory and all its subdirectories.

        :param str rpm_dir: directory to search in
        :returns: found RPM packages
        :rtype: [str]
        """
        subpaths = os.listdir(rpm_dir)
        subpacks = []
        for subpath in subpaths:
            if subpath == "." or subpath == "..":
                continue
            new_filepath = rpm_dir + "/" + subpath
            LOG.debug("Checking path for rpm %s", new_filepath)
            # if path is file validate name and inject
            if os.path.isfile(new_filepath) and re.search(
                r"\s*.rpm$", os.path.basename(new_filepath)
            ):
                LOG.info("Marking package %s for setup", new_filepath)
                subpacks.append(new_filepath)
            elif os.path.isdir(new_filepath):
                subpacks += self.find_rpm_packages(new_filepath)
        return subpacks

    @staticmethod
    def is_valid(package_path):
        """Verifies if a package is a valid rpm file.

        :param str package_path: .rpm package path.
        :returns: True if valid, otherwise false.
        :rtype: bool
        """
        abs_path = os.path.abspath(os.path.expanduser((package_path)))
        try:
            result = process.run(f"rpm -qp {abs_path}")
        except process.CmdError:
            return False
        if result.exit_status == 0:
            return True
        return False

    @staticmethod
    def extract_from_package(package_path, dest_path=None):
        """Extracts the package content to a specific destination path.

        :param str package_path: path to the rpm package.
        :param dest_path: destination path to extract the files. Default it
                          will be the current directory.
        :returns: path of the extracted file
        :returns: the path of the extracted files.
        :rtype: str
        """
        abs_path = os.path.abspath(os.path.expanduser(package_path))
        dest = dest_path or os.path.curdir

        # If something goes wrong process.run will raise a CmdError exception
        process.run(f"rpm2cpio {abs_path} | cpio -dium -D {dest}", shell=True)
        return dest

    def perform_setup(self, packages, no_dependencies=False):
        """
        General RPM setup with automatic handling of dependencies based on
        install attempts.

        :param packages: the RPM packages to install in dependency-friendly order
        :type packages: [str]
        :returns: whether setup completed successfully
        :rtype: bool
        """
        while len(packages) > 0:
            LOG.debug("Trying to install: %s", packages)
            failed_packages = []
            for package_path in packages:
                package_file = os.path.basename(package_path)
                package_name = "-".join(package_file.split("-")[0:-2])
                LOG.debug("%s -> %s", package_file, package_name)
                installed = self.check_installed(package_name)
                verified = self.rpm_verify(package_name) if installed else False
                if installed and not verified:
                    self.rpm_erase(package_name)
                if not installed or not verified:
                    success = self.rpm_install(package_path, no_dependencies)
                    if not success:
                        failed_packages.append(package_path)
            if len(packages) == len(failed_packages) > 0:
                LOG.warning(
                    "Some of the rpm packages could not be installed: %s",
                    ", ".join(failed_packages),
                )
                return False
            packages = failed_packages
        return True