avocado-framework/avocado

View on GitHub
avocado/utils/distro.py

Summary

Maintainability
C
1 day
Test Coverage
B
82%
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2013-2014
# Author: Lucas Meneghel Rodrigues <lmr@redhat.com>

"""
This module provides the client facilities to detect the Linux Distribution
it's running under.
"""

import logging
import os
import platform
import re

LOGGER = logging.getLogger(__name__)

__all__ = [
    "LinuxDistro",
    "UNKNOWN_DISTRO_NAME",
    "UNKNOWN_DISTRO_VERSION",
    "UNKNOWN_DISTRO_RELEASE",
    "UNKNOWN_DISTRO_ARCH",
    "Probe",
    "register_probe",
    "detect",
]


# pylint: disable=R0903
class LinuxDistro:
    """
    Simple collection of information for a Linux Distribution
    """

    def __init__(self, name, version, release, arch):
        """
        Initializes a new Linux Distro

        :param name: a short name that precisely distinguishes this Linux
                     Distribution among all others.
        :type name: str
        :param version: the major version of the distribution. Usually this
                        is a single number that denotes a large development
                        cycle and support file.
        :type version: str
        :param release: the release or minor version of the distribution.
                        Usually this is also a single number, that is often
                        omitted or starts with a 0 when the major version
                        is initially release. It's often associated with a
                        shorter development cycle that contains incremental
                        a collection of improvements and fixes.
        :type release: str
        :param arch: the main target for this Linux Distribution. It's common
                     for some architectures to ship with packages for
                     previous and still compatible architectures, such as it's
                     the case with Intel/AMD 64 bit architecture that support
                     32 bit code. In cases like this, this should be set to
                     the 64 bit architecture name.
        :type arch: str
        """
        self.name = name
        self.version = version
        self.release = release
        self.arch = arch

    def __repr__(self):
        return (
            f"<LinuxDistro: name={self.name}, version={self.version}, "
            f"release={self.release}, arch={self.arch}>"
        )


UNKNOWN_DISTRO_NAME = "unknown"
UNKNOWN_DISTRO_VERSION = 0
UNKNOWN_DISTRO_RELEASE = 0
UNKNOWN_DISTRO_ARCH = "unknown"


#: The distribution that is used when the exact one could not be found
UNKNOWN_DISTRO = LinuxDistro(
    UNKNOWN_DISTRO_NAME,
    UNKNOWN_DISTRO_VERSION,
    UNKNOWN_DISTRO_RELEASE,
    UNKNOWN_DISTRO_ARCH,
)


class Probe:
    """
    Probes the machine and does it best to confirm it's the right distro.
    If given an avocado.utils.ssh.Session object representing another machine, Probe
    will attempt to detect another machine's distro via an ssh connection.
    """

    #: Points to a file that can determine if this machine is running a given
    #: Linux Distribution. This servers a first check that enables the extra
    #: checks to carry on.
    CHECK_FILE = None

    #: Sets the content that should be checked on the file pointed to by
    #: :attr:`CHECK_FILE_EXISTS`. Leave it set to `None` (its default)
    #: to check only if the file exists, and not check its contents
    CHECK_FILE_CONTAINS = None

    #: The name of the Linux Distribution to be returned if the file defined
    #: by :attr:`CHECK_FILE_EXISTS` exist.
    CHECK_FILE_DISTRO_NAME = None

    #: A regular expression that will be run on the file pointed to by
    #: :attr:`CHECK_FILE_EXISTS`
    CHECK_VERSION_REGEX = None

    def __init__(self, session=None):
        self.score = 0
        self.session = session

    def check_for_remote_file(self, file_name):
        """
        Checks if provided file exists in remote machine

        :param file_name: name of file
        :type file_name: str
        :returns: whether the file exists in remote machine or not
        :rtype: bool
        """
        if self.session and self.session.cmd(f"test -f {file_name}").exit_status == 0:
            return True
        else:
            return False

    def check_name_for_file(self):
        """
        Checks if this class will look for a file and return a distro

        The conditions that must be true include the file that identifies the
        distro file being set (:attr:`CHECK_FILE`) and the name of the
        distro to be returned (:attr:`CHECK_FILE_DISTRO_NAME`)
        """
        if self.CHECK_FILE is None:
            return False

        if self.CHECK_FILE_DISTRO_NAME is None:
            return False

        return True

    def name_for_file(self):
        """
        Get the distro name if the :attr:`CHECK_FILE` is set and exists
        """
        if self.check_name_for_file():
            if self.check_for_remote_file(self.CHECK_FILE):
                return self.CHECK_FILE_DISTRO_NAME
            elif os.path.exists(self.CHECK_FILE):
                return self.CHECK_FILE_DISTRO_NAME

    def check_name_for_file_contains(self):
        """
        Checks if this class will look for text on a file and return a distro

        The conditions that must be true include the file that identifies the
        distro file being set (:attr:`CHECK_FILE`), the text to look for
        inside the distro file (:attr:`CHECK_FILE_CONTAINS`) and the name
        of the distro to be returned (:attr:`CHECK_FILE_DISTRO_NAME`)
        """
        if self.CHECK_FILE is None:
            return False

        if self.CHECK_FILE_CONTAINS is None:
            return False

        if self.CHECK_FILE_DISTRO_NAME is None:
            return False

        return True

    def name_for_file_contains(self):
        """
        Get the distro if the :attr:`CHECK_FILE` is set and has content
        """
        if self.check_name_for_file_contains():
            check_file = None
            if self.check_for_remote_file(self.CHECK_FILE):
                check_file = self.session.cmd(
                    f"cat {self.CHECK_FILE}"
                ).stdout_text.split("/n")
            elif os.path.exists(self.CHECK_FILE):
                try:
                    check_file = open(self.CHECK_FILE, encoding="utf-8")
                except IOError as err:
                    LOGGER.debug("Could not open %s", self.CHECK_FILE)
                    LOGGER.debug("Exception: %s", str(err))
                    return None

            if not check_file:
                return None

            for line in check_file:
                if self.CHECK_FILE_CONTAINS in line:
                    return self.CHECK_FILE_DISTRO_NAME

    def check_version(self):
        """
        Checks if this class will look for a regex in file and return a distro
        """
        if self.CHECK_FILE is None:
            return False

        if self.CHECK_VERSION_REGEX is None:
            return False

        return True

    def _get_version_match(self):
        """
        Returns the match result for the version regex on the file content
        """
        if self.check_version():
            if self.session:
                if self.session.cmd(f"test -f {self.CHECK_FILE}").exit_status != 0:
                    return None
            elif not os.path.exists(self.CHECK_FILE):
                return None

            version_file_content = None
            if self.session:
                version_file_content = self.session.cmd(
                    f"cat {self.CHECK_FILE}"
                ).stdout_text
            else:
                try:
                    version_file_content = open(
                        self.CHECK_FILE, encoding="utf-8"
                    ).read()
                except IOError as err:
                    LOGGER.debug("Could not open %s", self.CHECK_FILE)
                    LOGGER.debug("Exception: %s", str(err))
                    return None

            return self.CHECK_VERSION_REGEX.match(version_file_content)

    def version(self):
        """
        Returns the version of the distro
        """
        version = UNKNOWN_DISTRO_VERSION
        match = self._get_version_match()
        if match is not None:
            if len(match.groups()) > 0:
                version = match.groups()[0]
        return version

    def check_release(self):
        """
        Checks if this has the conditions met to look for the release number
        """
        return self.check_version() and self.CHECK_VERSION_REGEX.groups > 1

    def release(self):
        """
        Returns the release of the distro
        """
        release = UNKNOWN_DISTRO_RELEASE
        match = self._get_version_match()
        if match is not None:
            if len(match.groups()) > 1:
                if match.groups()[1]:
                    release = match.groups()[1]
        return release

    def get_distro(self):
        """
        :param session: ssh connection between another machine

        Returns the :class:`LinuxDistro` this probe detected
        """
        name = None
        version = UNKNOWN_DISTRO_VERSION
        release = UNKNOWN_DISTRO_RELEASE

        distro = None

        if self.check_name_for_file():
            name = self.name_for_file()
            self.score += 1

        if self.check_name_for_file_contains():
            name = self.name_for_file_contains()
            self.score += 1

        if self.check_version():
            version = self.version()
            self.score += 1

        if self.check_release():
            release = self.release()
            self.score += 1

        if self.session:
            arch = self.session.cmd("uname -m").stdout_text
        else:
            arch = platform.machine()

        # name is the first thing that should be identified. If we don't know
        # the distro name, we don't bother checking for versions
        if name is not None:
            distro = LinuxDistro(name, version, release, arch)
        else:
            distro = UNKNOWN_DISTRO

        return distro


class RedHatProbe(Probe):
    """
    Probe with version checks for Red Hat Enterprise Linux systems
    """

    CHECK_FILE = "/etc/redhat-release"
    CHECK_FILE_CONTAINS = "Red Hat Enterprise Linux"
    CHECK_FILE_DISTRO_NAME = "rhel"
    CHECK_VERSION_REGEX = re.compile(
        r"Red Hat Enterprise Linux\s+\w*\s*release\s+(\d{1,2})\.(\d{1,2}).*"
    )


class CentosProbe(RedHatProbe):
    """
    Probe with version checks for CentOS systems
    """

    CHECK_FILE = "/etc/redhat-release"
    CHECK_FILE_CONTAINS = "CentOS Linux"
    CHECK_FILE_DISTRO_NAME = "centos"
    CHECK_VERSION_REGEX = re.compile(r"CentOS Linux release (\d{1,2})\.(\d{1,2}).*")


class CentosStreamProbe(RedHatProbe):
    """
    Probe with version checks for CentOS Stream systems
    """

    CHECK_FILE = "/etc/redhat-release"
    CHECK_FILE_CONTAINS = "CentOS Stream"
    CHECK_FILE_DISTRO_NAME = "centos-stream"
    CHECK_VERSION_REGEX = re.compile(r"CentOS Stream release (\d{1,2})")


class FedoraProbe(RedHatProbe):
    """
    Probe with version checks for Fedora systems
    """

    CHECK_FILE = "/etc/fedora-release"
    CHECK_FILE_CONTAINS = "Fedora"
    CHECK_FILE_DISTRO_NAME = "fedora"
    CHECK_VERSION_REGEX = re.compile(r"Fedora release (\d{1,2}).*")


class AmazonLinuxProbe(Probe):
    """
    Probe for Amazon Linux systems
    """

    CHECK_FILE = "/etc/os-release"
    CHECK_FILE_CONTAINS = "Amazon Linux"
    CHECK_FILE_DISTRO_NAME = "amzn"
    CHECK_VERSION_REGEX = re.compile(
        r".*VERSION=\"(\d+)\.?(\d+)?\".*", re.MULTILINE | re.DOTALL
    )


class DebianProbe(Probe):
    """
    Simple probe with file checks for Debian systems
    """

    CHECK_FILE = "/etc/debian_version"
    CHECK_FILE_DISTRO_NAME = "debian"
    CHECK_VERSION_REGEX = re.compile(r"(\d+)\.(\d+)")


class UbuntuProbe(Probe):
    """
    Simple probe for Ubuntu systems in general
    """

    CHECK_FILE = "/etc/os-release"
    CHECK_FILE_CONTAINS = "ubuntu"
    CHECK_FILE_DISTRO_NAME = "Ubuntu"
    CHECK_VERSION_REGEX = re.compile(
        r".*VERSION_ID=\"(\d+)\.(\d+)\".*", re.MULTILINE | re.DOTALL
    )


class SUSEProbe(Probe):
    """
    Simple probe for SUSE systems in general
    """

    CHECK_FILE = "/etc/os-release"
    CHECK_FILE_CONTAINS = "SUSE"
    # this is the (incorrect) spelling used in python's platform
    # and tests are looking for it in distro.name. So keep using it
    CHECK_FILE_DISTRO_NAME = "SuSE"

    def get_distro(self):
        distro = super().get_distro()

        # if the default methods find SUSE, detect version
        if not distro.name == self.CHECK_FILE_DISTRO_NAME:
            return distro

        # we need to check VERSION_ID, which is number - VERSION can
        # be a string

        # for openSUSE Tumbleweed this will be e.g. 20161225
        # for openSUSE Leap this will be e.g. 42.2
        # for SUSE Linux Enterprise this will be e.g. 12 or 12.2 (for SP2)
        version_id_re = re.compile(r'VERSION_ID="([\d\.]*)"')
        version_id = None

        with open(self.CHECK_FILE, encoding="utf-8") as check_file:
            for line in check_file:
                match = version_id_re.match(line)
                if match:
                    version_id = match.group(1)

        if version_id:
            version_parts = version_id.split(".")
            distro.version = int(version_parts[0])
            if len(version_parts) > 1:
                distro.release = int(version_parts[1])

        return distro


class OpenEulerProbe(Probe):
    """
    Simple probe for openEuler systems in general
    """

    CHECK_FILE = "/etc/openEuler-release"
    CHECK_FILE_CONTAINS = "openEuler release"
    CHECK_FILE_DISTRO_NAME = "openEuler"
    CHECK_VERSION_REGEX = re.compile(r"openEuler release (\d+)\.(\d+).*")


class UnionTechProbe(Probe):
    """
    Simple probe for UnionTech systems in general
    """

    CHECK_FILE = "/etc/os-version"
    CHECK_FILE_CONTAINS = "UnionTech OS"
    CHECK_FILE_DISTRO_NAME = "uos"
    CHECK_VERSION_REGEX = re.compile(r"MinorVersion=(\d+)")


#: the complete list of probes that have been registered
REGISTERED_PROBES = []


def register_probe(probe_class):
    """
    Register a probe to be run during autodetection
    """
    if probe_class not in REGISTERED_PROBES:
        REGISTERED_PROBES.append(probe_class)


register_probe(RedHatProbe)
register_probe(CentosProbe)
register_probe(CentosStreamProbe)
register_probe(FedoraProbe)
register_probe(AmazonLinuxProbe)
register_probe(DebianProbe)
register_probe(SUSEProbe)
register_probe(UbuntuProbe)
register_probe(OpenEulerProbe)
register_probe(UnionTechProbe)


def detect(session=None):
    """
    Attempts to detect the Linux Distribution running on this machine.

    If given an avocado.utils.ssh.Session object, it will attempt to detect the
    distro of another machine via an ssh connection.

    :param session: ssh connection between another machine
    :type session: avocado.utils.ssh.Session
    :returns: the detected :class:`LinuxDistro` or :data:`UNKNOWN_DISTRO`
    :rtype: :class:`LinuxDistro`
    """
    results = []

    for probe_class in REGISTERED_PROBES:
        probe_instance = probe_class(session)
        distro_result = probe_instance.get_distro()
        if distro_result is not UNKNOWN_DISTRO:
            results.append((distro_result, probe_instance))

    results.sort(key=lambda t: t[1].score)
    if len(results) > 0:
        distro = results[-1][0]
    else:
        distro = UNKNOWN_DISTRO

    return distro


class Spec:
    """
    Describes a distro, usually for setting minimum distro requirements
    """

    def __init__(self, name, min_version=None, min_release=None, arch=None):
        self.name = name
        self.min_version = min_version
        self.min_release = min_release
        self.arch = arch