avocado/utils/multipath.py
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: 2016 IBM
# Author: Narasimhan V <sim@linux.vnet.ibm.com>
"""
Module with multipath related utility functions.
It needs root access.
"""
import ast
import logging
import time
from avocado.utils import distro, process, service, wait
LOG = logging.getLogger(__name__)
class MPException(Exception):
"""
Base Exception Class for all exceptions
"""
def get_svc_name():
"""
Gets the multipath service name based on distro.
"""
if distro.detect().name == "Ubuntu":
return "multipath-tools"
return "multipathd"
def form_conf_mpath_file(blacklist="", defaults_extra=""):
"""
Form a multipath configuration file, and restart multipath service.
:param blacklist: Entry in conf file to indicate blacklist section.
:param defaults_extra: Extra entry in conf file in defaults section.
"""
conf_file = "/etc/multipath.conf"
with open(conf_file, "w", encoding="utf-8") as mpath_fp:
mpath_fp.write("defaults {\n")
mpath_fp.write(" find_multipaths yes\n")
mpath_fp.write(" user_friendly_names yes\n")
if defaults_extra:
mpath_fp.write(f" {defaults_extra}\n")
mpath_fp.write("}\n")
if blacklist:
mpath_fp.write("blacklist {\n")
mpath_fp.write(f" {blacklist}\n")
mpath_fp.write("}\n")
LOG.debug(open(conf_file, "r", encoding="utf-8").read())
# The reason for sleep here is to give some time for change in
# multipath.conf file to take effect.
time.sleep(5)
mpath_svc = service.SpecificServiceManager(get_svc_name())
mpath_svc.restart()
wait.wait_for(mpath_svc.status, timeout=10)
def device_exists(mpath):
"""
Checks if a given mpath exists.
:param mpath: The multipath path
:return: True if path exists, False if does not exist.
:rtype: bool
"""
cmd = "multipath -ll"
out = process.run(cmd, ignore_status=True, sudo=True, shell=True).stdout_text
if mpath in out:
return True
return False
def get_mpath_name(wwid):
"""
Get multipath name for a given wwid.
:param wwid: wwid of multipath device.
:return: Name of multipath device.
:rtype: str
"""
if device_exists(wwid):
cmd = f"multipath -l {wwid}"
return process.run(cmd, sudo=True).stdout_text.split()[0]
def get_mpath_from_dm(dm_id):
"""
Get the mpath name for given device mapper id
:param dev_mapper: Input device mapper dm-x
:return: mpath name like mpathx
:rtype: str
"""
cmd = "multipathd show maps format '%d %n'"
try:
mpaths = process.run(cmd, ignore_status=True, sudo=True, shell=True).stdout_text
except process.CmdError as ex:
raise MPException(f"Multipathd Command Failed : {ex} ")
for mpath in mpaths.splitlines():
if dm_id in mpath:
return mpath.split()[1]
def get_multipath_wwids():
"""
Get list of multipath wwids.
:return: List of multipath wwids.
:rtype: list of str
"""
cmd = "egrep -v '^($|#)' /etc/multipath/wwids"
wwids = process.run(cmd, ignore_status=True, sudo=True, shell=True).stdout_text
wwids = wwids.strip("\n").replace("/", "").split("\n")
return wwids
def get_multipath_wwid(mpath):
"""
Get the wwid binding for given mpath name
:return: Multipath wwid
:rtype: str
"""
cmd = "multipathd show maps format '%n %w'"
try:
wwids = process.run(cmd, ignore_status=True, sudo=True, shell=True).stdout_text
except process.CmdError as ex:
raise MPException(f"Multipathd Command Failed : {ex} ")
for wwid in wwids.splitlines():
if mpath in wwid:
return wwid.split()[1]
def is_mpath_dev(mpath):
"""
Check the give name is a multipath device name or not.
:return: True if device is multipath or False
:rtype: Boolean
"""
cmd = "multipath -l -v 1"
try:
mpaths = process.run(cmd, ignore_status=True, sudo=True, shell=True).stdout_text
except process.CmdError as ex:
raise MPException(f"Multipath Command Failed : {ex} ")
if mpath in mpaths.strip("\n").split("\n"):
return True
return False
def get_paths(wwid):
"""
Get list of paths, given a multipath wwid.
:return: List of paths.
:rtype: list of str
"""
if not device_exists(wwid):
return
cmd = f"multipath -ll {wwid}"
lines = process.run(cmd, sudo=True).stdout_text.strip("\n")
paths = []
for line in lines.split("\n"):
if not (("size" in line) or ("policy" in line) or (wwid in line)):
paths.append(line.split()[-5])
return paths
def get_multipath_details():
"""
Get multipath details as a dictionary.
This is the output of the following command:
$ multipathd show maps json
:return: Dictionary of multipath output in json format
:rtype: dict
"""
mpath_op = process.run(
"multipathd show maps json", sudo=True, verbose=False
).stdout_text
if "multipath-tools v" in mpath_op:
return ""
mpath_op = ast.literal_eval(mpath_op.replace("\n", "").replace(" ", ""))
return mpath_op
def is_path_a_multipath(disk_path):
"""
Check if given disk path is part of a multipath.
:param disk_path: disk path. Example: sda, sdb.
:return: True if part of multipath, else False.
"""
if not process.system(
f"multipath -c /dev/{disk_path}", sudo=True, ignore_status=True, shell=True
):
return True
return False
def get_path_status(disk_path):
"""
Return the status of a path in multipath.
:param disk_path: disk path. Example: sda, sdb.
:return: Tuple in the format of (dm status, dev status, checker status)
"""
mpath_op = get_multipath_details()
if not mpath_op:
return ("", "", "")
for maps in mpath_op["maps"]:
for path_groups in maps["path_groups"]:
for paths in path_groups["paths"]:
if paths["dev"] == disk_path:
return (paths["dm_st"], paths["dev_st"], paths["chk_st"])
def get_mpath_paths_status(wwid):
"""
Return the status of all paths of mpath device.
:param wwid: wwid or user friedly name of mpath.
Example: mpatha or 360050768108001b3a800000000000296
:return: Dict in the format of {path: (dm status, dev status, checker status)}
"""
mpath_op = get_multipath_details()
if not mpath_op:
return
wwid_paths = {}
for maps in mpath_op["maps"]:
if maps["name"] == wwid or maps["uuid"] == wwid:
for path_groups in maps["path_groups"]:
for paths in path_groups["paths"]:
wwid_paths[paths["dev"]] = (
paths["dm_st"],
paths["dev_st"],
paths["chk_st"],
)
if len(wwid_paths) != 0:
return wwid_paths
return
def fail_path(path):
"""
Fail the individual paths.
:param str path: disk path. Example: sda, sdb.
:return: True if succeeded, False otherwise
:rtype: bool
"""
def is_failed():
path_stat = get_path_status(path)
if path_stat[0] == "failed" and path_stat[2] == "faulty":
return True
return False
cmd = f'multipathd -k"fail path {path}"'
if process.system(cmd) == 0:
return wait.wait_for(is_failed, timeout=10) or False
return False
def reinstate_path(path):
"""
Reinstate the individual paths.
:param str path: disk path. Example: sda, sdb.
:return: True if succeeded, False otherwise
"""
def is_reinstated():
path_stat = get_path_status(path)
if path_stat[0] == "active" and path_stat[2] == "ready":
return True
return False
cmd = f'multipathd -k"reinstate path {path}"'
if process.system(cmd) == 0:
return wait.wait_for(is_reinstated, timeout=10) or False
return False
def get_policy(wwid):
"""
Gets path_checker policy, given a multipath wwid.
:return: path checker policy.
:rtype: str
"""
if device_exists(wwid):
cmd = f"multipath -ll {wwid}"
lines = process.run(cmd, sudo=True).stdout_text.strip("\n")
for line in lines.split("\n"):
if "policy" in line:
return line.split("'")[1].split()[0]
def get_size(wwid):
"""
Gets size of device, given a multipath wwid.
:return: size of multipath device.
:rtype: str
"""
if device_exists(wwid):
cmd = f"multipath -ll {wwid}"
lines = process.run(cmd, sudo=True).stdout_text.strip("\n")
for line in lines.split("\n"):
if "size" in line:
return line.split("=")[1].split()[0]
def flush_path(path_name):
"""
Flushes the given multipath.
:return: Returns False if command fails, True otherwise.
"""
cmd = f"multipath -f {path_name}"
if process.system(cmd, ignore_status=True, sudo=True, shell=True):
return False
return True
def get_mpath_status(mpath):
"""
Get the status of mpathX of multipaths.
:param mpath: mpath names. Example: mpatha, mpathb.
:return: state of mpathX eg: Active, Suspend, None
"""
cmd = f'multipathd -k"show maps status" | grep -i {mpath}'
mpath_status = process.getoutput(cmd).split()[-2]
return mpath_status
def suspend_mpath(mpath):
"""
Suspend the given mpathX of multipaths.
:param mpath: mpath names. Example: mpatha, mpathb.
:return: True or False
"""
def is_mpath_suspended():
if get_mpath_status(mpath) == "suspend":
return True
return False
cmd = f'multipathd -k"suspend map {mpath}"'
if process.system(cmd) == 0:
return wait.wait_for(is_mpath_suspended, timeout=10) or False
return False
def resume_mpath(mpath):
"""
Resume the suspended mpathX of multipaths.
:param mpath_name: mpath names. Example: mpatha, mpathb.
:return: True or False
"""
def is_mpath_resumed():
if get_mpath_status(mpath) == "active":
return True
return False
cmd = f'multipathd -k"resume map {mpath}"'
if process.system(cmd) == 0:
return wait.wait_for(is_mpath_resumed, timeout=10) or False
return False
def remove_mpath(mpath):
"""
Remove the mpathX of multipaths.
:param mpath_name: mpath names. Example: mpatha, mpathb.
:return: True or False
"""
def is_mpath_removed():
if device_exists(mpath):
return False
return True
cmd = f'multipathd -k"remove map {mpath}"'
if process.system(cmd) == 0:
return wait.wait_for(is_mpath_removed, timeout=10) or False
return False
def add_mpath(mpath):
"""
Add back the removed mpathX of multipath.
:param mpath_name: mpath names. Example: mpatha, mpathb.
:return: True or False
"""
def is_mpath_added():
if device_exists(mpath):
return True
return False
cmd = f'multipathd -k"add map {mpath}"'
if process.system(cmd) == 0:
return wait.wait_for(is_mpath_added, timeout=10) or False
return False
def remove_path(path):
"""
Remove the individual paths.
:param disk_path: disk path. Example: sda, sdb.
:return: True or False
"""
def is_path_removed():
if get_path_status(path) is None:
return True
return False
cmd = f'multipathd -k"remove path {path}"'
if process.system(cmd) == 0:
return wait.wait_for(is_path_removed, timeout=10) or False
return False
def add_path(path):
"""
Add back the removed individual paths.
:param str path: disk path. Example: sda, sdb.
:return: True or False
"""
def is_path_added():
if get_path_status(path) is None:
return False
return True
cmd = f'multipathd -k"add path {path}"'
if process.system(cmd) == 0:
return wait.wait_for(is_path_added, timeout=10) or False
return False