avocado-framework/avocado

View on GitHub
avocado/utils/partition.py

Summary

Maintainability
C
1 day
Test Coverage
F
26%
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: 2016 IBM.
# Author: Rajashree Rajendran<rajashr7@linux.vnet.ibm.com>
# Copyright: 2016 Red Hat, Inc.
# Author: Lukas Doktor <ldoktor@redhat.com>
#
# Based on code by: Martin Bligh (mbligh@google.com)
#     Copyright: Google 2006-2008
#     https://github.com/autotest/autotest/blob/master/client/partition.py

"""
Utility for handling partitions.
"""

import hashlib
import logging
import os
import tempfile

from avocado.utils import filelock, process

LOG = logging.getLogger(__name__)


class PartitionError(Exception):
    """
    Generic PartitionError
    """

    def __init__(self, partition, reason, details=None):
        msg = reason + ": " + str(details) if details else reason
        super().__init__(msg)
        self.partition = partition

    def __str__(self):
        return f"Partition({self.partition.device}): {super().__str__()}"


class MtabLock:
    device = "/etc/mtab"

    def __init__(self, timeout=60):
        self.timeout = timeout
        self.mtab = None
        device_hash = hashlib.sha1(self.device.encode("utf-8")).hexdigest()
        lock_filename = os.path.join(tempfile.gettempdir(), device_hash)
        self.lock = filelock.FileLock(lock_filename, timeout=self.timeout)

    def __enter__(self):
        try:
            self.lock.__enter__()
        except (filelock.LockFailed, filelock.AlreadyLocked) as e:
            reason = (
                f"Unable to obtain '{self.device}' " f"lock in {int(self.timeout)}s"
            )
            raise PartitionError(self, reason, e)
        self.mtab = open(self.device)  # pylint: disable=W1514
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        if self.mtab:
            self.mtab.close()
        self.lock.__exit__(exc_type, exc_value, exc_traceback)


class Partition:
    """
    Class for handling partitions and filesystems
    """

    def __init__(
        self, device, loop_size=0, mountpoint=None, mkfs_flags="", mount_options=None
    ):
        """
        :param device: The device in question (e.g."/dev/hda2"). If device is a
                file it will be mounted as loopback.
        :param loop_size: Size of loopback device (in MB). Defaults to 0.
        :param mountpoint: Where the partition to be mounted to.
        :param mkfs_flags: Optional flags for mkfs
        :param mount_options: Add mount options optionally
        """
        self.device = device
        self.loop = loop_size
        self.fstype = None
        self.mountpoint = mountpoint
        self.mkfs_flags = mkfs_flags
        self.mount_options = mount_options
        if self.loop:
            process.run(f"dd if=/dev/zero of={device} bs=1M " f"count={int(self.loop)}")

    def __repr__(self):
        return f"<Partition: {self.device}>"

    @staticmethod
    def list_mount_devices():
        """
        Lists mounted file systems and swap on devices.
        """
        # list mounted file systems
        devices = [line.split()[0] for line in process.getoutput("mount").splitlines()]
        # list mounted swap devices
        swaps = process.getoutput("swapon -s").splitlines()
        devices.extend([line.split()[0] for line in swaps if line.startswith("/")])
        return devices

    @staticmethod
    def list_mount_points():
        """
        Lists the mount points.
        """
        return [line.split()[2] for line in process.getoutput("mount").splitlines()]

    def get_mountpoint(self, filename=None):
        """
        Find the mount point of this partition object.

        :param filename: where to look for the mounted partitions information
                (default None which means it will search /proc/mounts and/or
                /etc/mtab)

        :return: a string with the mount point of the partition or None if not
                mounted
        """
        # Try to match this device/mountpoint
        if filename:
            with open(filename) as open_file:  # pylint: disable=W1514
                for line in open_file:
                    parts = line.split()
                    if parts[0] == self.device or os.path.realpath(
                        parts[0]
                    ) == os.path.realpath(self.device):
                        if parts[1] == self.mountpoint:
                            return parts[1]  # The mountpoint where it's mounted
                return None

        # no specific file given, look in /proc/mounts
        res = self.get_mountpoint(filename="/proc/mounts")
        if not res:
            # sometimes the root partition is reported as /dev/root in
            # /proc/mounts in this case, try /etc/mtab
            res = self.get_mountpoint(filename="/etc/mtab")

            if res != "/":
                res = None
        return res

    def mkfs(self, fstype=None, args=""):
        """
        Format a partition to filesystem type

        :param fstype: the filesystem type, such as "ext3", "ext2". Defaults
                       to previously set type or "ext2" if none has set.
        :param args: arguments to be passed to mkfs command.
        """

        if self.device in self.list_mount_devices():
            raise PartitionError(self, "Unable to format mounted device")

        if not fstype:
            if self.fstype:
                fstype = self.fstype
            else:
                fstype = "ext2"

        if self.mkfs_flags:
            args += " " + self.mkfs_flags
        if fstype in ["xfs", "btrfs"]:
            args += " -f"

        if self.loop:
            if fstype.startswith("ext"):
                args += " -F"
            elif fstype == "reiserfs":
                args += " -f"

        # If there isn't already a '-t <type>' argument, add one.
        if "-t" not in args:
            args = f"-t {fstype} {args}"

        args = args.strip()

        mkfs_cmd = f"mkfs {args} {self.device}"

        try:
            process.system_output(f"yes | {mkfs_cmd}", shell=True)
        except process.CmdError as error:
            raise PartitionError(self, "Failed to mkfs", error)
        else:
            self.fstype = fstype

    def mount(self, mountpoint=None, fstype=None, args="", mnt_check=True):
        """
        Mount this partition to a mount point

        :param mountpoint: If you have not provided a mountpoint to partition
                object or want to use a different one, you may specify it here.
        :param fstype: Filesystem type. If not provided partition object value
                will be used.
        :param args: Arguments to be passed to "mount" command.
        :param mnt_check: Flag to check/avoid checking existing device/mountpoint
        """
        if not mountpoint:
            mountpoint = self.mountpoint
        if not mountpoint:
            raise PartitionError(
                self,
                "No mountpoint specified and no "
                "default provided to this partition object",
            )
        if fstype is None:
            fstype = self.fstype
        else:
            self.fstype = fstype

        if self.mount_options:
            args += " -o " + self.mount_options
        if fstype:
            args += " -t " + fstype
        args = args.lstrip()

        with MtabLock():
            if mnt_check:
                if self.device in self.list_mount_devices():
                    raise PartitionError(self, "Attempted to mount mounted device")
                if mountpoint in self.list_mount_points():
                    raise PartitionError(self, "Attempted to mount busy directory")
            if not os.path.isdir(mountpoint):
                os.makedirs(mountpoint)
            try:
                if self.loop:
                    losetup_cmd = f"losetup --find --show -f {self.device}"
                    self.device = process.run(
                        losetup_cmd, sudo=True
                    ).stdout_text.strip()
                process.system(f"mount {args} {self.device} {mountpoint}", sudo=True)
            except process.CmdError as details:
                raise PartitionError(self, "Mount failed", details)
        # Update the fstype as the mount command passed
        self.fstype = fstype

    def _get_pids_on_mountpoint(self, mnt):
        """
        Returns a list of processes using a given mountpoint
        """
        try:
            cmd = "lsof " + mnt
            out = process.system_output(cmd, sudo=True)
            return [int(line.split()[1]) for line in out.splitlines()[1:]]
        except OSError as details:
            msg = f'Could not run lsof to identify processes using "{mnt}"'
            LOG.error(msg)
            raise PartitionError(self, msg, details)
        except process.CmdError as details:
            msg = f'Failure executing "{cmd}"'
            LOG.error(msg)
            raise PartitionError(self, msg, details)

    def _unmount_force(self, mountpoint):
        """
        Kill all other jobs accessing this partition and force unmount it.

        :return: None
        :raise PartitionError: On critical failure
        """
        for pid in self._get_pids_on_mountpoint(mountpoint):
            try:
                process.system(f"kill -9 {int(pid)}", ignore_status=True, sudo=True)
            except process.CmdError as kill_details:
                raise PartitionError(self, "Failed to kill processes", kill_details)
        # Unmount
        try:
            process.run(f"umount -f {mountpoint}", sudo=True)
        except process.CmdError:
            try:
                process.run(f"umount -l {mountpoint}", sudo=True)
            except process.CmdError as umount_details:
                raise PartitionError(self, "Force unmount failed", umount_details)

    def unmount(self, force=True):
        """
        Umount this partition.

        It's easier said than done to umount a partition.
        We need to lock the mtab file to make sure we don't have any
        locking problems if we are umounting in parallel.

        When the unmount fails and force==True we unmount the partition
        ungracefully.

        :return: 1 on success, 2 on force umount success
        :raise PartitionError: On failure
        """
        with MtabLock():
            mountpoint = self.get_mountpoint()
            result = 1
            if not mountpoint:
                LOG.debug("%s not mounted", self.device)
                return result
            try:
                process.run("umount " + mountpoint, sudo=True)
                result = 1
            except process.CmdError as details:
                if force:
                    LOG.debug("Standard umount failed on %s, forcing", mountpoint)
                    self._unmount_force(mountpoint)
                    result = 2
                else:
                    raise PartitionError(self, "Unable to unmount gracefully", details)
        if self.loop:
            try:
                process.run(f"losetup -d {self.device}", sudo=True)
            except process.CmdError as details:
                raise PartitionError(self, "Unable to cleanup loop device", details)

        return result