avocado/utils/disk.py
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# This code was inspired in the autotest project,
#
# client/base_utils.py
#
# Copyright: 2018 IBM
# Authors : Praveen K Pandey <praveen@linux.vnet.ibm.com>
# : Narasimhan V <sim@linux.vnet.ibm.com>
# : Naresh Bannoth <nbannoth@linux.vnet.ibm.com>
"""
Disk utilities
"""
import json
import logging
import os
import re
from avocado.utils import genio, multipath, process
LOGGER = logging.getLogger(__name__)
class DiskError(Exception):
"""
Generic DiskError
"""
def freespace(path):
fs_stats = os.statvfs(path)
return fs_stats.f_bsize * fs_stats.f_bavail
def get_disk_blocksize(path):
"""Return the disk block size, in bytes"""
fs_stats = os.statvfs(path)
return fs_stats.f_bsize
def create_loop_device(size, blocksize=4096, directory="./"):
"""
Creates a loop device of size and blocksize specified.
:param size: Size of loop device, in bytes
:type size: int
:param blocksize: block size of loop device, in bytes. Defaults to 4096
:type blocksize: int
:param directory: Directory where the backing file will be created.
Defaults to current directory.
:type directory: str
:return: loop device name
:rtype: str
"""
cmd = "losetup --find"
loop = process.run(cmd, ignore_status=True, sudo=True).stdout_text.strip("\n")
loop_file = os.path.join(directory, f"tmp_{loop.split('/')[-1]}.img")
cmd = (
f"dd if=/dev/zero of={loop_file} bs={blocksize} "
f"count={int(size / blocksize)}"
)
if process.system(cmd, ignore_status=True, sudo=True) != 0:
raise DiskError("Unable to create backing file for loop device")
cmd = f"losetup {loop} {loop_file} -P"
if process.system(cmd, ignore_status=True, sudo=True) != 0:
raise DiskError("Unable to create the loop device")
return loop
def delete_loop_device(device):
"""
Deletes the specified loop device.
:param device: device to be deleted
:type device: str
:return: True if deleted.
:rtype: bool
"""
cmd = "losetup -aJl"
loop_dic = json.loads(process.run(cmd, ignore_status=True, sudo=True).stdout_text)
loop_file = ""
for loop_dev in loop_dic["loopdevices"]:
if device == loop_dev["name"]:
loop_file = loop_dev["back-file"]
if not loop_file:
raise DiskError("Unable to find backing file for loop device")
cmd = f"losetup -d {device}"
if process.system(cmd, ignore_status=True, sudo=True) != 0:
raise DiskError("Unable to delete the loop device")
os.remove(loop_file)
return True
def get_disks():
"""
Returns the physical "hard drives" available on this system
This is a simple wrapper around `lsblk` and will return all the
top level physical (non-virtual) devices return by it.
TODO: this is currently Linux specific. Support for other
platforms is desirable and may be implemented in the future.
:returns: a list of paths to the physical disks on the system
:rtype: list of str
"""
try:
json_result = process.run("lsblk --json --paths --inverse")
except process.CmdError as ce:
raise DiskError(f"Error occurred while executing lsblk command: {ce}")
try:
json_data = json.loads(json_result.stdout_text)
except json.JSONDecodeError as je:
raise DiskError(f"Error occurred while parsing JSON data: {je}")
disks = []
for device in json_data["blockdevices"]:
disks.append(device["name"])
if "children" in device:
for child in device["children"]:
disks.append(child["name"])
return disks
def get_all_disk_paths():
"""
Returns all available disk names and alias on this system
This will get all the sysfs disks name entries by its device
node name, by-uuid, by-id and by-path, irrespective of any
platform and device type
:returns: a list of all disk path names
:rtype: list of str
"""
disk_list = abs_path = []
for path in [
"/dev",
"/dev/mapper",
"/dev/disk/by-id",
"/dev/disk/by-path",
"/dev/disk/by-uuid",
"/dev/disk/by-partuuid",
"/dev/disk/by-partlabel",
]:
if os.path.exists(path):
for device in os.listdir(path):
abs_path.append(os.path.join(path, device))
disk_list.extend(abs_path)
return disk_list
def get_absolute_disk_path(device):
"""
Returns absolute device path of given disk
This will get actual disks path of given device, it can take
node name, by-uuid, by-id and by-path, irrespective of any
platform and device type
:param device: disk name or disk alias names sda or scsi-xxx
:type device: str
:returns: the device absolute path name
:rtype: bool
"""
if not os.path.exists(device):
for dev_path in get_all_disk_paths():
if device == os.path.basename(dev_path):
return dev_path
return device
def get_available_filesystems():
"""
Return a list of all available filesystem types
:returns: a list of filesystem types
:rtype: list of str
"""
filesystems = set()
with open("/proc/filesystems") as proc_fs: # pylint: disable=W1514
for proc_fs_line in proc_fs.readlines():
filesystems.add(re.sub(r"(nodev)?\s*", "", proc_fs_line))
return list(filesystems)
def get_filesystem_type(mount_point="/"):
"""
Returns the type of the filesystem of mount point informed.
The default mount point considered when none is informed
is the root "/" mount point.
:param str mount_point: mount point to asses the filesystem type.
Default "/"
:returns: filesystem type
:rtype: str
"""
with open("/proc/mounts") as mounts: # pylint: disable=W1514
for mount_line in mounts.readlines():
_, fs_file, fs_vfstype, _, _, _ = mount_line.split()
if fs_file == mount_point:
return fs_vfstype
def is_root_device(device):
"""
check for root disk
:param device: device to check
:returns: True or False, True if given device is root disk
otherwise will return False.
"""
cmd = "lsblk --j -o MOUNTPOINT,PKNAME"
output = process.run(cmd)
result = json.loads(output.stdout_text)
for item in result["blockdevices"]:
if item["mountpoint"] == "/" and device == str(item["pkname"]):
return True
return False
def is_disk_mounted(device):
"""
check if given disk is mounted or not
:param device: disk/device name
:type device: str
:returns: True if the device/disk is mounted else False
:rtype: bool
"""
with open("/proc/mounts") as mounts: # pylint: disable=W1514
for mount_line in mounts.readlines():
dev, _, _, _, _, _ = mount_line.split()
if dev == device:
return True
return False
def is_dir_mounted(dir_path):
"""
check if given directory is mounted or not
:param dir_path: directory path
:type dir_path: str
:returns: True if the given director is mounted else False
:rtype: bool
"""
with open("/proc/mounts") as mounts: # pylint: disable=W1514
for mount_line in mounts.readlines():
_, fs_dir, _, _, _, _ = mount_line.split()
if fs_dir == dir_path:
return True
return False
def fs_exists(device):
"""
check if filesystem exists on give disk/device
:param device: disk/device name
:type device: str
:returns: returns True if filesystem exists on the give disk else False
:rtype: bool
"""
cmd = f"blkid -o value -s TYPE {device}"
out = process.system_output(cmd, shell=True, ignore_status=True).decode("utf-8")
fs_list = ["ext2", "ext3", "ext4", "xfs", "btrfs"]
if out in fs_list:
return True
return False
def get_dir_mountpoint(dir_path):
"""
get mounted disk name that is mounted on given dir_path
:param dir_path: absolute directory path
:type dir_path: str
:returns: returns disk name which mounted on given dir_path
:rtype: str
"""
with open("/proc/mounts") as mounts: # pylint: disable=W1514
for mount_line in mounts.readlines():
dev, fs_dir, _, _, _, _ = mount_line.split()
if fs_dir == dir_path:
return dev
return None
def get_disk_mountpoint(device):
"""
get mountpoint on which given disk is mounted
:param device: disk/device name
:type device: str
:return: return directory name on which disk is mounted
:rtype: str
"""
with open("/proc/mounts") as mounts: # pylint: disable=W1514
for mount_line in mounts.readlines():
dev, fs_dir, _, _, _, _ = mount_line.split()
if dev == device:
return fs_dir
return None
def create_linux_raw_partition(disk_name, size=None, num_of_par=1):
"""
Creates partitions using sfdisk command
:param disk_name: disk/device name
:type disk_name: str
:param size: size of partition
:type size: str
:param num_of_par: Number of partitions to be created
:type num_of_par: int
Returns list of created partitions
"""
if not size:
size = get_size_of_disk(disk_name) / 1073741824
size = size / num_of_par
size = str(size) + "G"
partitions = [
"size= +" + size if val != 3 else "type=5" for val in range(0, num_of_par + 1)
]
disk_partition_file = (
"/tmp/creat_partition" + process.run("date '+%d-%m-%y_%T'").stdout_text.strip()
)
if not os.path.isfile(disk_partition_file):
process.run("touch " + disk_partition_file)
for line in partitions:
genio.append_one_line(disk_partition_file, line)
try:
part_output = process.getoutput(
"sfdisk " + disk_name + " < " + disk_partition_file
)
except:
msg = f"sfdisk partition creation command failed on disk {disk_name}"
LOGGER.warning(msg)
raise DiskError(msg)
rescan_disk(disk_name)
if "The partition table has been altered" in part_output:
return get_disk_partitions(disk_name)
def get_size_of_disk(disk):
"""
Returns size of disk in bytes
:param disk: disk/device name
:type disk: str
Return Type: int
"""
return int(process.getoutput("lsblk -b --output SIZE -n -d " + disk))
def delete_partition(partition_name):
"""
Deletes mentioned partition from disk
:param partition_name: partition absolute path
:type partition_name: str
"""
disk_index = re.search(r"\d+", partition_name).start()
try:
process.run(
"sfdisk --delete "
+ partition_name[:disk_index]
+ " "
+ partition_name[disk_index:]
)
except:
msg = f"sfdisk --delete command failed on disk {partition_name}"
LOGGER.warning(msg)
raise DiskError(msg)
def clean_disk(disk_name):
"""
Cleans partitions table of a disk
:param disk_name: disk name
:type disk_name: str
"""
output = process.getoutput("sfdisk --delete " + disk_name)
rescan_disk(disk_name)
if not get_disk_partitions(disk_name):
if "The partition table has been altered" in output:
process.run("wipefs -af " + disk_name)
def rescan_disk(disk_name):
"""
Rescans disk
:param disk_name: disk name
:type disk_name: str
"""
disk_name = os.path.realpath(disk_name)
if re.search(r"dm-\d+", disk_name):
mpath_dict = multipath.get_multipath_details()
for _ in range(len(mpath_dict["maps"])):
if mpath_dict["maps"][_]["sysfs"] == disk_name.split("/")[-1]:
disk_name = (
"/dev/" + mpath_dict["maps"][_]["path_groups"][0]["paths"][0]["dev"]
)
break
process.run(f"echo 1 > /sys/block/{disk_name}/device/rescan")
def get_disk_partitions(disk):
"""
Returns partitions of a disk excluding extended partition
:param disk: disk name
:type disk: str
Returns array with all partitions of disk
"""
rescan_disk(disk)
partitions_op = process.getoutput("sfdisk -l " + disk)
return [
line.split(" ")[0]
for line in partitions_op.split("\n")
if line.startswith(disk) and "Extended" not in line
]