avocado-framework/avocado

View on GitHub
avocado/utils/vmimage.py

Summary

Maintainability
D
1 day
Test Coverage
D
65%
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2017
# Author: Amador Pahim <apahim@redhat.com>

"""
Provides VM images acquired from official repositories
"""

import os
import re
import tempfile
import uuid
import warnings
from html.parser import HTMLParser
from urllib.error import HTTPError
from urllib.request import urlopen

from avocado.utils import archive, asset, astring
from avocado.utils import path as utils_path
from avocado.utils import process

# pylint: disable=C0401
#: The "qemu-img" binary used when creating the snapshot images.  If
#: set to None (the default), it will attempt to find a suitable binary
#: with :func:`avocado.utils.path.find_command`, which uses the the
#: system's PATH environment variable
QEMU_IMG = None


#: Default image architecture, which defaults to the current machine
#: architecture, if that's can be retrieved via :func:`os.uname`. If
#: not, it defaults to "x86_64" simply because it's the most commonly
#: used architecture and lack of a better default
if hasattr(os, "uname"):
    DEFAULT_ARCH = os.uname()[4]
else:
    DEFAULT_ARCH = "x86_64"


class ImageProviderError(Exception):
    """
    Generic error class for ImageProvider
    """


class VMImageHtmlParser(HTMLParser):  # pylint: disable=W0223
    """
    Custom HTML parser to extract the href items that match
    a given pattern
    """

    def __init__(self, pattern):
        HTMLParser.__init__(self)
        self.items = []
        self.pattern = pattern

    def handle_starttag(self, tag, attrs):
        if tag != "a":
            return
        for attr in attrs:
            if attr[0] == "href" and re.match(self.pattern, attr[1]):
                match = attr[1].strip("/")
                if match not in self.items:
                    self.items.append(match)


class ImageProviderBase:
    """
    Base class to define the common methods and attributes of an
    image. Intended to be sub-classed by the specific image providers.
    """

    HTML_ENCODING = "utf-8"

    def __init__(self, version, build, arch):

        self.url_versions = None
        self.url_images = None
        self.image_pattern = None
        self._file_name = None
        self._version = version
        self._best_version = None
        self.build = build
        self.arch = arch

    @property
    def version(self):
        return self._best_version or self.get_version()

    @property
    def version_pattern(self):
        return f"^{self._version}/$"

    @property
    def file_name(self):
        if self._file_name is None:
            self._file_name = self.image_pattern.format(
                version=self.version, build=self.build, arch=self.arch
            )
        return self._file_name

    def _feed_html_parser(self, url, parser):
        try:
            data = urlopen(url).read()
            parser.feed(astring.to_text(data, self.HTML_ENCODING))
        except HTTPError:
            raise ImageProviderError(f"Cannot open {self.url_versions}")

    @staticmethod
    def get_best_version(versions):
        return max(versions)

    def get_versions(self):
        """Return all available versions for the current parameters."""
        parser = VMImageHtmlParser(self.version_pattern)
        self._feed_html_parser(self.url_versions, parser)

        resulting_versions = []
        if parser.items:
            for version in parser.items:
                # Trying to convert version to int or float so max()
                # can compare numerical values.
                try:
                    # Can it be converted to integer?
                    resulting_versions.append(int(version))
                except ValueError:
                    try:
                        # Can it be converted to float?
                        resulting_versions.append(float(version))
                    except ValueError:
                        # So it's just a string
                        resulting_versions.append(version)

        return resulting_versions

    def get_version(self):
        """Probes the higher version available for the current parameters."""
        resulting_versions = self.get_versions()
        if resulting_versions:
            self._best_version = self.get_best_version(resulting_versions)
            return self._best_version
        else:
            raise ImageProviderError(
                f"Version not available at " f"{self.url_versions}"
            )

    def get_image_url(self):
        """
        Probes the higher image available for the current parameters.
        """
        if not self.url_images or not self.image_pattern:
            raise ImageProviderError(
                "url_images and image_pattern attributes are required to get image url"
            )

        url_images = self.url_images.format(
            version=self.version, build=self.build, arch=self.arch
        )
        image = self.image_pattern.format(
            version=self.version, build=self.build, arch=self.arch
        )
        parser = VMImageHtmlParser(image)

        self._feed_html_parser(url_images, parser)

        if parser.items:
            return url_images + max(parser.items)
        else:
            raise ImageProviderError(
                f"No images matching '{image}' " f"at '{url_images}'. Wrong arch?"
            )

    def get_image_parameters(self, image_file_name):
        """
        Computation of image parameters from image_pattern

        :param image_file_name: pattern with parameters
        :type image_file_name: str
        :return: dict with parameters
        :rtype: dict or None
        """
        keywords = re.split(r"\{(.*?)\}", self.image_pattern)[1::2]
        matches = re.match(self.file_name, image_file_name)

        if not matches:
            return None

        return {x: matches.group(x) for x in keywords}


class FedoraImageProviderBase(ImageProviderBase):
    """
    Base Fedora Image Provider
    """

    HTML_ENCODING = "iso-8859-1"
    url_old_images = None

    def get_image_url(self):
        if int(self.version) >= 28:
            cloud = "Cloud"
        else:
            cloud = "CloudImages"

        if self.url_old_images and int(self.version) <= 36:
            self.url_versions = self.url_old_images

        self.url_images = self.url_versions + "{version}/" + cloud + "/{arch}/images/"
        return super().get_image_url()


class FedoraImageProvider(FedoraImageProviderBase):
    """
    Fedora Image Provider
    """

    name = "Fedora"

    def __init__(self, version="[0-9]+", build="[0-9]+.[0-9]+", arch=DEFAULT_ARCH):
        super().__init__(version, build, arch)
        self.url_versions = "https://dl.fedoraproject.org/pub/fedora/linux/releases/"
        self.url_old_images = (
            "https://archives.fedoraproject.org/pub/archive/fedora/linux/releases/"
        )
        self.image_pattern = "Fedora-Cloud-Base-(?P<version>{version})-(?P<build>{build}).(?P<arch>{arch}).qcow2$"


class FedoraSecondaryImageProvider(FedoraImageProviderBase):
    """
    Fedora Secondary Image Provider
    """

    name = "FedoraSecondary"

    def __init__(self, version="[0-9]+", build="[0-9]+.[0-9]+", arch=DEFAULT_ARCH):
        super().__init__(version, build, arch)

        self.url_versions = (
            "https://dl.fedoraproject.org/pub/fedora-secondary/releases/"
        )
        self.url_old_images = (
            "https://archives.fedoraproject.org/pub/archive/fedora-secondary/releases/"
        )
        self.image_pattern = "Fedora-Cloud-Base-(?P<version>{version})-(?P<build>{build}).(?P<arch>{arch}).qcow2$"


class CentOSImageProvider(ImageProviderBase):
    """
    CentOS Image Provider
    """

    name = "CentOS"

    def __init__(self, version="[0-9]+", build="[0-9]{4}", arch=DEFAULT_ARCH):
        super().__init__(version, build, arch)
        self.url_versions = "https://cloud.centos.org/centos/"
        self.url_images = self.url_versions + "{version}/images/"
        self.image_pattern = "CentOS-(?P<version>{version})-(?P<arch>{arch})-GenericCloud-(?P<build>{build}).qcow2.xz$"

    def _set_patterns(self):
        if int(self.version) >= 8:
            self.build = r"[\d\.\-]+"
            self.url_images = self.url_versions + "{version}/{arch}/images/"
            self.image_pattern = "CentOS-(?P<version>{version})-GenericCloud-(?P<build>{build}).(?P<arch>{arch}).qcow2$"

    @property
    def file_name(self):
        self._set_patterns()
        return super().file_name

    def get_image_url(self):
        self._set_patterns()
        return super().get_image_url()


class UbuntuImageProvider(ImageProviderBase):
    """
    Ubuntu Image Provider
    """

    name = "Ubuntu"

    def __init__(self, version="[0-9]+.[0-9]+", build=None, arch=DEFAULT_ARCH):
        # Ubuntu uses 'amd64' instead of 'x86_64'
        if arch == "x86_64":
            arch = "amd64"
        # and 'arm64' instead of 'aarch64'
        elif arch == "aarch64":
            arch = "arm64"

        super().__init__(version, build, arch)
        self.url_versions = "http://cloud-images.ubuntu.com/releases/"
        self.url_images = self.url_versions + "releases/{version}/release/"
        self.image_pattern = (
            "ubuntu-(?P<version>{version})-server-cloudimg-(?P<arch>{arch}).img"
        )

    def get_best_version(self, versions):  # pylint: disable=W0221
        """Return best (more recent) version"""
        max_float = max(float(item) for item in versions)
        return str(f"{max_float:2.2f}")

    def get_versions(self):
        """Return all available versions for the current parameters."""
        parser = VMImageHtmlParser(self.version_pattern)
        self._feed_html_parser(self.url_versions, parser)

        resulting_versions = []
        if parser.items:
            for version in parser.items:
                max_float = float(version)
                resulting_versions.append(str(f"{max_float:2.2f}"))
        return resulting_versions


class DebianImageProvider(ImageProviderBase):
    """
    Debian Image Provider
    """

    name = "Debian"

    def __init__(self, version=None, build=r"[\d{8}\-\d{3}]", arch=DEFAULT_ARCH):
        # Debian uses 'amd64' instead of 'x86_64'
        if arch == "x86_64":
            arch = "amd64"
        # and 'arm64' instead of 'aarch64'
        elif arch == "aarch64":
            arch = "arm64"

        table_version = {
            "buster": "10",
            "bullseye": "11",
        }

        table_codename = {
            "10": "buster",
            "11": "bullseye",
        }

        # Default version if none was selected, should work at least until 2023 Q3
        if version is None:
            version = "bullseye"

        # User provided a numerical version
        if version in table_codename.keys():
            version = table_codename[version]

        # If version is not a codename by now, it's wrong or unknown,
        # so let's fail early
        if version not in table_version.keys():
            raise ImageProviderError("Unknown version", version)

        super().__init__(version, build, arch)
        self.url_versions = "https://cloud.debian.org/images/cloud/"
        self.url_images = self.url_versions + version + "/{build}/"
        self.image_pattern = (
            "debian-"
            + table_version[version]
            + "-generic-(?P<arch>{arch})-{build}.qcow2$"
        )

    def get_image_url(self):
        # Find out the build first
        parserbuild = VMImageHtmlParser(self.build)
        self._feed_html_parser(self.url_versions + self._version + "/", parserbuild)
        self.build = max(parserbuild.items)

        return super().get_image_url()


class JeosImageProvider(ImageProviderBase):
    """
    JeOS Image Provider
    """

    name = "JeOS"

    def __init__(self, version="[0-9]+", build=None, arch=DEFAULT_ARCH):
        # JeOS uses '64' instead of 'x86_64'
        if arch == "x86_64":
            arch = "64"

        super().__init__(version, build, arch)
        self.url_versions = "https://avocado-project.org/data/assets/jeos/"
        self.url_images = self.url_versions + "{version}/"
        self.image_pattern = "jeos-(?P<version>{version})-(?P<arch>{arch}).qcow2.xz$"


class OpenSUSEImageProvider(ImageProviderBase):
    """
    OpenSUSE Image Provider
    """

    HTML_ENCODING = "iso-8859-1"
    name = "OpenSUSE"

    def __init__(self, version="[0-9]{2}.[0-9]{1}", build=None, arch=DEFAULT_ARCH):
        super().__init__(version, build, arch)
        self.url_versions = (
            "https://download.opensuse.org/pub/opensuse/distribution/leap/"
        )
        self.url_images = self.url_versions + "{version}/appliances/"

        if not build:
            self.image_pattern = "openSUSE-Leap-(?P<version>{version})-JeOS.(?P<arch>{arch})-OpenStack-Cloud.qcow2$"

        else:
            self.image_pattern = (
                "openSUSE-Leap-(?P<version>{version})-JeOS.(?P<arch>{arch})-{version}"
                "-OpenStack-Cloud-Build(?P<build>{build}).qcow2$"
            )

    @staticmethod
    def _convert_version_numbers(versions):
        """
        Return float instead of strings
        """
        return [float(v) for v in versions]

    def get_versions(self):
        versions = super().get_versions()
        return self._convert_version_numbers(versions)

    def get_best_version(self, versions):  # pylint: disable=W0221
        version_numbers = self._convert_version_numbers(versions)
        if str(self._version).startswith("4"):
            version_numbers = [v for v in version_numbers if v >= 40.0]
        else:
            version_numbers = [v for v in version_numbers if v < 40.0]
        return max(version_numbers)


class CirrOSImageProvider(ImageProviderBase):
    """
    CirrOS Image Provider

    CirrOS is a Tiny OS that specializes in running on a cloud.
    """

    name = "CirrOS"

    def __init__(
        self, version=r"[0-9]+\.[0-9]+\.[0-9]+", build=None, arch=DEFAULT_ARCH
    ):
        super().__init__(version=version, build=build, arch=arch)
        self.url_versions = "https://download.cirros-cloud.net/"
        self.url_images = self.url_versions + "{version}/"
        self.image_pattern = "cirros-{version}-{arch}-disk.img$"


class FreeBSDImageProvider(ImageProviderBase):
    """
    FreeBSD Image Provider
    """

    name = "FreeBSD"

    def __init__(self, version="[0-9]+.[0-9]", build=None, arch=DEFAULT_ARCH):
        # FreeBSD uses 'amd64' instead of 'x86_64'
        if arch == "x86_64":
            arch = "amd64"

        super().__init__(version + "-RELEASE", build, arch)
        self.url_versions = (
            "http://ftp-archive.freebsd.org/pub/FreeBSD-Archive/old-releases/VM-IMAGES/"
        )
        self.url_images = self.url_versions + "{version}-RELEASE/{arch}/Latest/"

        arch2 = ""
        if arch == "aarch64":
            arch2 = "arm64-"
        elif arch == "riscv64":
            arch2 = "riscv-"
        self.image_pattern = (
            "FreeBSD-(?P<version>{version})-RELEASE-"
            + arch2
            + "(?P<arch>{arch}).qcow2.xz"
        )

    def get_best_version(self, versions):  # pylint: disable=W0221
        """Return best (more recent) version"""
        max_float = max(float(item) for item in versions)
        return str(f"{max_float:2.1f}")

    def get_versions(self):
        """Return all available versions for the current parameters."""
        parser = VMImageHtmlParser(self.version_pattern)
        self._feed_html_parser(self.url_versions, parser)

        resulting_versions = []
        if parser.items:
            for version in parser.items:
                max_float = float(version.split("-")[0])
                resulting_versions.append(str(f"{max_float:2.1f}"))

        return resulting_versions


class Image:
    def __init__(
        self,
        name,
        url,
        version,
        arch,
        build,
        checksum,
        algorithm,
        cache_dir,
        snapshot_dir=None,
    ):
        """
        Creates an instance of Image class.

        :param name: Name of image.
        :type name: str
        :param url: The url where the image can be fetched from.
        :type url: str
        :param version: Version of image.
        :type version: int
        :param arch: Architecture of the system image.
        :type arch: str
        :param build: Build of the system image.
        :type build: str
        :param checksum: Hash of the system image to match after download.
        :type checksum: str
        :param algorithm: Hash type, used when the checksum is provided.
        :type algorithm: str
        :param cache_dir: Local system path where the base images will be held.
        :type cache_dir: str or iterable
        :param snapshot_dir: Local system path where the snapshot images
                            will be held.  Defaults to cache_dir if none is given.
        :type snapshot_dir: str
        """
        self.name = name
        self.url = url
        self.version = version
        self.arch = arch
        self.build = build
        self.checksum = checksum
        self.algorithm = algorithm
        self.cache_dir = cache_dir
        self.snapshot_dir = snapshot_dir

        self._path = None
        self._base_image = None

    @property
    def path(self):
        return self._path or self.get()

    @property
    def base_image(self):
        return self._base_image or self.download()

    def __repr__(self):
        return (
            f"<{self.__class__.__name__} name={self.name} "
            f"version={self.version} arch={self.arch}>"
        )

    def download(self):
        metadata = {
            "type": "vmimage",
            "name": self.name,
            "version": self.version,
            "arch": self.arch,
            "build": self.build,
        }
        if isinstance(self.cache_dir, str):
            cache_dirs = [self.cache_dir]
        else:
            cache_dirs = self.cache_dir
        asset_path = asset.Asset(
            name=self.url,
            asset_hash=self.checksum,
            algorithm=self.algorithm,
            locations=None,
            cache_dirs=cache_dirs,
            expire=None,
            metadata=metadata,
        ).fetch(timeout=asset.DOWNLOAD_TIMEOUT * 3)

        if archive.is_archive(asset_path):
            uncompressed_path = os.path.splitext(asset_path)[0]
            asset_path = archive.uncompress(asset_path, uncompressed_path)
        self._base_image = asset_path
        return self._base_image

    def get(self):
        self.download()
        self._path = self._take_snapshot()
        return self._path

    def _take_snapshot(self):
        """
        Takes a snapshot from the current image.
        """
        if QEMU_IMG is None:
            qemu_img = utils_path.find_command("qemu-img")
        else:
            qemu_img = QEMU_IMG
        name, extension = os.path.splitext(self.base_image)
        new_image = f"{name}-{str(uuid.uuid4()).split('-', maxsplit=1)[0]}{extension}"
        if self.snapshot_dir is not None:
            new_image = os.path.join(self.snapshot_dir, os.path.basename(new_image))
        cmd = (
            f"{qemu_img} create -f qcow2 -b {self.base_image} -F " f"qcow2 {new_image}"
        )
        process.run(cmd)
        return new_image

    @classmethod
    def from_parameters(
        cls,
        name=None,
        version=None,
        build=None,
        arch=None,
        checksum=None,
        algorithm=None,
        cache_dir=None,
        snapshot_dir=None,
    ):
        """
        Returns an Image, according to the parameters provided.

        :param name: (optional) Name of the Image Provider, usually matches
                     the distro name.
        :param version: (optional) Version of the system image.
        :param build: (optional) Build number of the system image.
        :param arch: (optional) Architecture of the system image.
        :param checksum: (optional) Hash of the system image to match after
                         download.
        :param algorithm: (optional) Hash type, used when the checksum is
                          provided.
        :param cache_dir: (optional) Local system path where the base
                          images will be held.
        :param snapshot_dir: (optional) Local system path where the snapshot
                             images will be held.  Defaults to cache_dir if
                             none is given.

        :returns: Image instance that can provide the image
                  according to the parameters.
        """
        provider = get_best_provider(name, version, build, arch)

        if cache_dir is None:
            cache_dir = tempfile.gettempdir()
        try:
            return cls(
                name=provider.name,
                url=provider.get_image_url(),
                version=provider.version,
                arch=provider.arch,
                checksum=checksum,
                algorithm=algorithm,
                build=provider.build,
                cache_dir=cache_dir,
                snapshot_dir=snapshot_dir,
            )
        except ImageProviderError:
            pass

        raise AttributeError("Provider not available")


def get(
    name=None,
    version=None,
    build=None,
    arch=None,
    checksum=None,
    algorithm=None,
    cache_dir=None,
    snapshot_dir=None,
):
    """This method is deprecated. Use Image.from_parameters()."""
    warnings.warn(
        "deprecated, use Image.from_parameters() instead.", DeprecationWarning
    )
    return Image.from_parameters(
        name, version, build, arch, checksum, algorithm, cache_dir, snapshot_dir
    )


def get_best_provider(name=None, version=None, build=None, arch=None):
    """
    Wrapper to get parameters of the best Image Provider, according to
    the parameters provided.

    :param name: (optional) Name of the Image Provider, usually matches
                 the distro name.
    :param version: (optional) Version of the system image.
    :param build: (optional) Build number of the system image.
    :param arch: (optional) Architecture of the system image.

    :returns: Image Provider
    """

    if name is not None:
        name = name.lower()

    provider_args = {}
    if version is not None:
        provider_args["version"] = version
    if build is not None:
        provider_args["build"] = build
    if arch is not None:
        provider_args["arch"] = arch
        if name == "fedora" and arch in ("ppc64", "ppc64le", "s390x"):
            name = "fedorasecondary"

    for provider in IMAGE_PROVIDERS:
        if name is None or name == provider.name.lower():
            try:
                return provider(**provider_args)
            except ImageProviderError:
                pass

    raise AttributeError("Provider not available")


def list_providers():
    """
    List the available Image Providers
    """
    return set(
        _
        for _ in globals().values()
        if (
            isinstance(_, type)
            and issubclass(_, ImageProviderBase)
            and hasattr(_, "name")
        )
    )


#: List of available providers classes
IMAGE_PROVIDERS = list_providers()