avocado-framework/avocado

View on GitHub
selftests/unit/utils/archive.py

Summary

Maintainability
B
5 hrs
Test Coverage
import os
import random
import sys
import tempfile
import unittest

from avocado.utils import archive, crypto, data_factory
from selftests.utils import BASEDIR, temp_dir_prefix

ZSTD_AVAILABLE = archive._probe_zstd_cmd() is not None


class ArchiveTest(unittest.TestCase):
    def setUp(self):
        prefix = temp_dir_prefix(self)
        self.basedir = tempfile.TemporaryDirectory(prefix=prefix)
        self.compressdir = tempfile.mkdtemp(dir=self.basedir.name)
        self.decompressdir = tempfile.mkdtemp(dir=self.basedir.name)
        self.sys_random = random.SystemRandom()

    def compress_and_check_dir(self, extension):
        hash_map_1 = {}
        for i in range(self.sys_random.randint(10, 20)):
            if i % 2 == 0:
                compressdir = tempfile.mkdtemp(dir=self.compressdir)
            else:
                compressdir = self.compressdir
            str_length = self.sys_random.randint(30, 50)
            fd, filename = tempfile.mkstemp(dir=compressdir, text=True)
            with os.fdopen(fd, "w") as f:
                f.write(data_factory.generate_random_string(str_length))
            relative_path = filename.replace(self.compressdir, "")
            hash_map_1[relative_path] = crypto.hash_file(filename)

        archive_filename = self.compressdir + extension
        archive.compress(archive_filename, self.compressdir)
        archive.uncompress(archive_filename, self.decompressdir)

        hash_map_2 = {}
        for root, _, files in os.walk(self.decompressdir):
            for name in files:
                file_path = os.path.join(root, name)
                relative_path = file_path.replace(self.decompressdir, "")
                hash_map_2[relative_path] = crypto.hash_file(file_path)

        self.assertEqual(hash_map_1, hash_map_2)

    def compress_and_check_file(self, extension):
        str_length = self.sys_random.randint(30, 50)
        fd, filename = tempfile.mkstemp(dir=self.basedir.name, text=True)
        with os.fdopen(fd, "w") as f:
            f.write(data_factory.generate_random_string(str_length))
        original_hash = crypto.hash_file(filename)
        dstfile = filename + extension
        archive_filename = os.path.join(self.basedir.name, dstfile)
        archive.compress(archive_filename, filename)
        ret = archive.uncompress(archive_filename, self.decompressdir)
        self.assertEqual(ret, os.path.basename(filename))
        decompress_file = os.path.join(self.decompressdir, os.path.basename(filename))
        decompress_hash = crypto.hash_file(decompress_file)
        self.assertEqual(original_hash, decompress_hash)

    def test_zip_dir(self):
        self.compress_and_check_dir(".zip")

    def test_zip_file(self):
        self.compress_and_check_file(".zip")

    def test_tar_dir(self):
        self.compress_and_check_dir(".tar")

    def test_tar_file(self):
        self.compress_and_check_file(".tar")

    def test_tgz_dir(self):
        self.compress_and_check_dir(".tar.gz")

    def test_tgz_file(self):
        self.compress_and_check_file(".tar.gz")

    def test_tgz_2_dir(self):
        self.compress_and_check_dir(".tgz")

    def test_tgz_2_file(self):
        self.compress_and_check_file(".tgz")

    def test_tbz2_dir(self):
        self.compress_and_check_dir(".tar.bz2")

    def test_tbz2_file(self):
        self.compress_and_check_file(".tar.bz2")

    def test_tbz2_2_dir(self):
        self.compress_and_check_dir(".tbz2")

    def test_tbz2_2_file(self):
        self.compress_and_check_file(".tbz2")

    @unittest.skipIf(
        sys.platform.startswith("darwin"),
        "macOS does not support archive extra attributes",
    )
    def test_zip_extra_attrs(self):
        """
        Check that utils.archive reflects extra attrs of file like symlinks
        and file permissions.
        """

        def get_path(*args):
            """Get path with decompressdir prefix"""
            return os.path.join(self.decompressdir, *args)

        # File types
        zip_path = os.path.abspath(
            os.path.join(
                os.path.dirname(__file__),
                os.path.pardir,
                os.path.pardir,
                ".data",
                "test_archive__symlinks.zip",
            )
        )
        # TODO: Handle permission correctly for all users
        # The umask is not yet handled by utils.archive, hardcode it for now
        os.umask(2)
        archive.uncompress(zip_path, self.decompressdir)
        self.assertTrue(os.path.islink(get_path("link_to_dir")))
        self.assertTrue(os.path.islink(get_path("link_to_file")))
        self.assertTrue(os.path.islink(get_path("link_to_file2")))
        self.assertTrue(os.path.islink(get_path("dir", "2nd_link_to_file")))
        self.assertTrue(os.path.islink(get_path("dir", "link_to_link_to_file2")))
        self.assertTrue(os.path.islink(get_path("dir", "2nd_link_to_file")))
        self.assertTrue(os.path.islink(get_path("link_to_dir", "2nd_link_to_file")))
        self.assertTrue(os.path.isfile(get_path("file")))
        self.assertTrue(os.path.isfile(get_path("dir", "file2")))
        self.assertTrue(os.path.isfile(get_path("link_to_dir", "file2")))
        act = os.path.realpath(get_path("link_to_dir", "link_to_link_to_file2"))
        exp = get_path("dir", "file2")
        self.assertEqual(act, exp)
        self.assertEqual(os.path.realpath(get_path("link_to_dir")), get_path("dir"))
        # File permissions
        self.assertEqual(os.stat(get_path("dir", "file2")).st_mode & 0o777, 0o664)
        self.assertEqual(os.stat(get_path("file")).st_mode & 0o777, 0o753)
        self.assertEqual(os.stat(get_path("dir")).st_mode & 0o777, 0o775)
        self.assertEqual(os.stat(get_path("link_to_file2")).st_mode & 0o777, 0o664)
        self.assertEqual(os.stat(get_path("link_to_dir")).st_mode & 0o777, 0o775)
        self.assertEqual(os.stat(get_path("link_to_file")).st_mode & 0o777, 0o753)

    def test_empty_tbz2(self):
        ret = archive.uncompress(
            os.path.join(BASEDIR, "selftests", ".data", "empty.tar.bz2"),
            self.decompressdir,
        )
        self.assertEqual(ret, None, (f"Empty archive should return None " f"({ret})"))

    def test_is_gzip_file(self):
        gz_path = os.path.join(BASEDIR, "selftests", ".data", "avocado.gz")
        self.assertTrue(archive.is_gzip_file(gz_path))

    def test_gzip_uncompress_to_dir(self):
        gz_path = os.path.join(BASEDIR, "selftests", ".data", "avocado.gz")
        ret = archive.gzip_uncompress(gz_path, self.decompressdir)
        self.assertEqual(ret, os.path.join(self.decompressdir, "avocado"))

    def test_gzip_uncompress_to_file(self):
        gz_path = os.path.join(BASEDIR, "selftests", ".data", "avocado.gz")
        filename = os.path.join(self.decompressdir, "other")
        ret = archive.gzip_uncompress(gz_path, filename)
        self.assertEqual(ret, filename)

    def test_gzip_is_archive(self):
        gz_path = os.path.join(BASEDIR, "selftests", ".data", "avocado.gz")
        self.assertTrue(archive.is_archive(gz_path))

    def test_uncompress_gzip(self):
        gz_path = os.path.join(BASEDIR, "selftests", ".data", "avocado.gz")
        ret = archive.uncompress(gz_path, self.decompressdir)
        self.assertEqual(ret, os.path.join(self.decompressdir, "avocado"))
        with open(ret, "rb") as decompressed:
            self.assertEqual(decompressed.read(), b"avocado\n")

    def test_is_lzma_file(self):
        xz_path = os.path.join(BASEDIR, "selftests", ".data", "avocado.xz")
        self.assertTrue(archive.is_lzma_file(xz_path))

    def test_null_is_not_lzma_file(self):
        self.assertFalse(archive.is_lzma_file(os.devnull))

    def test_lzma_uncompress_to_dir(self):
        xz_path = os.path.join(BASEDIR, "selftests", ".data", "avocado.xz")
        ret = archive.lzma_uncompress(xz_path, self.decompressdir)
        self.assertEqual(ret, os.path.join(self.decompressdir, "avocado"))

    def test_lzma_uncompress_to_file(self):
        xz_path = os.path.join(BASEDIR, "selftests", ".data", "avocado.xz")
        filename = os.path.join(self.decompressdir, "other")
        ret = archive.lzma_uncompress(xz_path, filename)
        self.assertEqual(ret, filename)

    def test_lzma_is_archive(self):
        xz_path = os.path.join(BASEDIR, "selftests", ".data", "avocado.xz")
        self.assertTrue(archive.is_archive(xz_path))

    def test_uncompress_lzma(self):
        xz_path = os.path.join(BASEDIR, "selftests", ".data", "avocado.xz")
        ret = archive.uncompress(xz_path, self.decompressdir)
        self.assertEqual(ret, os.path.join(self.decompressdir, "avocado"))
        with open(ret, "rb") as decompressed:
            self.assertEqual(decompressed.read(), b"avocado\n")

    def test_is_zstd_file(self):
        xz_path = os.path.join(BASEDIR, "selftests", ".data", "avocado.zst")
        self.assertTrue(archive.is_zstd_file(xz_path))

    def test_null_is_not_zstd_file(self):
        self.assertFalse(archive.is_zstd_file(os.devnull))

    def test_zstd_is_archive(self):
        zstd_path = os.path.join(BASEDIR, "selftests", ".data", "avocado.zst")
        self.assertTrue(archive.is_archive(zstd_path))

    @unittest.skipUnless(ZSTD_AVAILABLE, "zstd tool is not available")
    def test_zstd_uncompress_to_dir(self):
        zstd_path = os.path.join(BASEDIR, "selftests", ".data", "avocado.zst")
        ret = archive.zstd_uncompress(zstd_path, self.decompressdir)
        self.assertEqual(ret, os.path.join(self.decompressdir, "avocado"))

    @unittest.skipUnless(ZSTD_AVAILABLE, "zstd tool is not available")
    def test_zstd_uncompress_to_file(self):
        zstd_path = os.path.join(BASEDIR, "selftests", ".data", "avocado.zst")
        filename = os.path.join(self.decompressdir, "other")
        ret = archive.zstd_uncompress(zstd_path, filename)
        self.assertEqual(ret, filename)

    @unittest.skipUnless(ZSTD_AVAILABLE, "zstd tool is not available")
    def test_uncompress_zstd(self):
        zstd_path = os.path.join(BASEDIR, "selftests", ".data", "avocado.zst")
        ret = archive.uncompress(zstd_path, self.decompressdir)
        self.assertEqual(ret, os.path.join(self.decompressdir, "avocado"))
        with open(ret, "rb") as decompressed:
            self.assertEqual(decompressed.read(), b"avocado\n")

    def test_null_is_not_archive(self):
        self.assertFalse(archive.is_archive(os.devnull))

    def tearDown(self):
        try:
            self.basedir.cleanup()
        except OSError:
            pass


if __name__ == "__main__":
    unittest.main()