avocado/utils/pmem.py
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright (C) IBM 2020 - Harish <harish@linux.vnet.ibm.com>
#
# Author: Harish <harish@linux.vnet.ibm.com>
#
import glob
import json
import re
from avocado.utils import genio, path, process
class PMemException(Exception):
"""
Error raised for all PMem failures
"""
def __init__(self, additional_text=None): # pylint: disable=W0231
self.additional_text = additional_text
def __str__(self):
return f"Command failed.\ninfo: {self.additional_text}"
class PMem:
"""
PMem class which provides function to perform ndctl and daxctl operations
This class can be used only if ndctl binaries are provided before hand
"""
def __init__(self, ndctl="ndctl", daxctl="daxctl"):
"""
Initialize PMem object
:param ndctl: path to ndctl binary, defaults to ndctl
:param daxctl: path to daxctl binary, defaults to ndctl
"""
abs_ndctl = path.find_command(ndctl, False)
if not abs_ndctl:
raise PMemException("Cannot use library without proper ndctl binary")
self.ndctl = abs_ndctl
abs_daxctl = path.find_command(daxctl, False)
if not abs_daxctl:
raise PMemException("Cannot use library without proper daxctl binary")
self.daxctl = abs_daxctl
@staticmethod
def check_subcmd(binary, command):
"""Check if given sub command is supported by binary
:param command: sub command of ndctl to check for existence
:return: True if sub command is available
:rtype: bool
"""
cmd = f"{binary} --list-cmds"
out = process.system_output(cmd).decode().splitlines()
if command in out:
return True
return False
def check_ndctl_subcmd(self, command):
"""Check if given sub command is supported by ndctl"""
return self.check_subcmd(self.ndctl, command)
def check_daxctl_subcmd(self, command):
"""Check if given sub command is supported by daxctl"""
return self.check_subcmd(self.daxctl, command)
def run_ndctl_list(self, option=""):
"""
Get the json of each provided options
:param option: optional arguments to ndctl list command
:return: By default returns entire list of json objects
:rtype: list of json objects
"""
try:
cmd = f"{self.ndctl} list {option}"
json_op = json.loads(process.system_output(cmd))
except ValueError:
json_op = []
return json_op
@staticmethod
def run_ndctl_list_val(json_op, field):
"""
Get the value of a field in given json
:param json_op: Input Json object
:param field: Field to find the value from json_op object
:rtype: Found value type, None if not found
"""
for key, value in json_op.items():
if key == field:
return value
return None
def run_daxctl_list(self, options=""):
"""
Get the json of each provided options
:param options: optional arguments to daxctl list command
:return: By default returns entire list of json objects
:rtype: list of json objects
"""
cmd = f"{self.daxctl} list {options}"
return json.loads(process.system_output(cmd))
def set_dax_memory_online(self, device, region=None, no_movable=False):
"""Set memory from a given devdax device online
:param device: Device from which memory is to be online
:param region: Optionally filter device by region
:param no_movable: Optionally make the memory non-movable
:return: True if command succeeds
:rtype: bool
:raise: :class:`PMemException`, if command fails.
"""
cmd = f"{self.daxctl} online-memory {device}"
if region:
cmd += f" -r {region}"
if no_movable:
cmd += " --no-movable"
if process.system(cmd, shell=True, ignore_status=True):
raise PMemException(f"Failed to online memory with {device}")
return True
def set_dax_memory_offline(self, device, region=None):
"""Set memory from a given devdax device offline
:param device: Device from which memory is to be offline
:param region: Optionally filter device by region
:return: True if command succeeds
:rtype: bool
:raise: :class:`PMemException`, if command fails.
"""
cmd = f"{self.daxctl} offline-memory {device}"
if region:
cmd += f" -r {region}"
if process.system(cmd, shell=True, ignore_status=True):
raise PMemException(f"Failed to offline memory with {device}")
return True
def reconfigure_dax_device(
self, device, mode="devdax", region=None, no_online=False, no_movable=False
):
"""Reconfigure devdax device into devdax or system-ram mode
:param device: Device from which memory is to be online
:param mode: Mode with which device is to be configured, default:devdax
:param region: Optionally filter device by region
:param no_online: Optionally don't online the memory(only system-ram)
:param no_movable: Optionally mark memory non-movable(only system-ram)
:return: Property of configured device
:rtype: str
:raise: :class:`PMemException`, if command fails.
"""
cmd = f"{self.daxctl} reconfigure-device {device} -m {mode}"
if region:
cmd += f" -r {region}"
if no_online:
cmd += " -N"
if no_movable:
cmd += " --no-movable"
device_property = process.run(cmd, shell=True, ignore_status=True)
if device_property.exit_status:
raise PMemException(f"Failed to reconfigure device {device}")
return device_property.stdout_text
def get_slot_count(self, region):
"""
Get max slot count in the index area for a dimm backing a region
We use region0 - > nmem0
:param region: Region for which slot count is found
:return: Number of slots for given region
0 in case region is not available/command fails
:rtype: int
"""
nmem = "nmem%s" % re.findall(r"\d+", region)[0]
try:
json_op = json.loads(
process.system_output(
f"{self.ndctl} read-labels -j {nmem} ", shell=True
)
)
except ValueError:
return []
first_dict = json_op[0]
index_dict = self.run_ndctl_list_val(first_dict, "index")[0]
return self.run_ndctl_list_val(index_dict, "nslot") - 2
@staticmethod
def is_region_legacy(region):
"""
Check whether we have label index namespace. If legacy we can't create
new namespaces.
:param region: Region for which legacy check is made
:return: True if given region is legacy, else False
"""
reg = f"/sys/bus/nd/devices/{region}/nstype"
nstype = genio.read_file(reg).rstrip("\n")
if nstype == "4":
return True
return False
@staticmethod
def check_buses():
"""
Get buses from sys subsystem to verify persistent devices exist
:return: List of buses available
:rtype: list
"""
return glob.glob("/sys/bus/nd/drivers/nd_bus/ndbus*")
def disable_region(self, name="all"):
"""
Disable given region
:param name: name of the region to be disabled
:return: True on success
:raise: :class:`PMemException`, if command fails.
"""
if process.system(
f"{self.ndctl} disable-region {name}", shell=True, ignore_status=True
):
raise PMemException(f"Failed to disable {name} region(s)")
return True
def enable_region(self, name="all"):
"""
Enable given region
:param name: name of the region to be enabled
:return: True on success
:raise: :class:`PMemException`, if command fails.
"""
if process.system(
f"{self.ndctl} enable-region {name}", shell=True, ignore_status=True
):
raise PMemException(f"Failed to enable {name} region(s)")
return True
def disable_namespace(self, namespace="all", region="", bus="", verbose=False):
"""
Disable namespaces
:param namespace: name of the namespace to be disabled
:param region: Filter namespace by region
:param bus: Filter namespace by bus
:param verbose: Enable True command with debug information
:return: True on success
:raise: :class:`PMemException`, if command fails.
"""
args = namespace
if region:
args = f"{args} -r {region}"
if bus:
args = f"{args} -b {bus}"
if verbose:
args = f"{args} -v"
if process.system(
f"{self.ndctl} disable-namespace {args}", shell=True, ignore_status=True
):
raise PMemException(f"Namespace disable failed for {namespace}")
return True
def enable_namespace(self, namespace="all", region="", bus="", verbose=False):
"""
Enable namespaces
:param namespace: name of the namespace to be enabled
:param region: Filter namespace by region
:param bus: Filter namespace by bus
:param verbose: Enable True command with debug information
return: True on success
:raise: :class:`PMemException`, if command fails.
"""
args = namespace
if region:
args = f"{args} -r {region}"
if bus:
args = f"{args} -b {bus}"
if verbose:
args = f"{args} -v"
if process.system(
f"{self.ndctl} enable-namespace {args}", shell=True, ignore_status=True
):
raise PMemException(f'Namespace enable failed for "{namespace}"')
return True
def create_namespace(
self,
region="",
bus="",
n_type="pmem",
mode="fsdax",
memmap="dev",
name="",
size="",
uuid="",
sector_size="",
align="",
reconfig="",
force=False,
autolabel=False,
):
"""
Creates namespace with specified options
:param region: Region on which namespace has to be created
:param bus: Bus with which namespace has to be created
:param n_type: Type of namespace to be created [pmem/blk]
:param mode: Mode of namespace to be created, defaults to fsdax
:param memmap: Metadata mapping for created namespace
:param name: Optional name provided for namespace
:param size: Size with which namespace has to be created
:param uuid: Optional uuid provided for namespace
:param sector_size: Sector size with which namespace has to be created
:param align: Alignment with which namespace has to be created
:param reconfig: Optionally reconfigure namespace providing existing
namespace/region name
:param force: Force creation of namespace
:param autolabel: Optionally autolabel the namespace
:return: True on success
:raise: :class:`PMemException`, if command fails.
"""
args_dict = {
region: "-r",
bus: "-b",
name: "-n",
size: "-s",
uuid: "-u",
sector_size: "-l",
align: "-a",
reconfig: "-e",
}
minor_dict = {force: "-f", autolabel: "-L"}
args = f"-t {n_type} -m {mode} "
if mode in ["fsdax", "devdax"]:
args += f" -M {memmap}"
for option in list(args_dict.keys()):
if option:
args += f" {args_dict[option]} {option}"
for option in list(minor_dict.keys()):
if option:
args += f" {minor_dict[option]}"
if self.is_region_legacy(region) and not reconfig:
namespace = "namespace%s.0" % re.findall(r"\d+", region)[0]
args += " -f -e " + namespace
if process.system(
f"{self.ndctl} create-namespace {args}", shell=True, ignore_status=True
):
raise PMemException("Namespace create command failed")
return True
def destroy_namespace(self, namespace="all", region="", bus="", force=False):
"""
Destroy namespaces, skipped in case of legacy namespace
:param namespace: name of the namespace to be destroyed
:param region: Filter namespace by region
:param bus: Filter namespace by bus
:param force: Force a namespace to be destroyed
:return: True on Success
:raise: :class:`PMemException`, if command fails.
"""
if region and self.is_region_legacy(region):
return True
args = namespace
args_dict = {region: "-r", bus: "-b"}
for option in list(args_dict.keys()):
if option:
args += f" {args_dict[option]} {option}"
if force:
args += " -f"
if process.system(
f"{self.ndctl} destroy-namespace {args}", shell=True, ignore_status=True
):
raise PMemException("Namespace destroy command failed")
return True
@staticmethod
def _check_arg(key, kwargs):
if key in kwargs and kwargs[key]:
return True
return False
@staticmethod
def _check_add_arg(args_dict, key, kwargs, pop=False):
if PMem._check_arg(key, kwargs):
if pop:
return f" {args_dict[key]}" % kwargs.pop(key)
return f" {args_dict[key]}" % kwargs.get(key)
return ""
@staticmethod
def _filter_ns_infoblock(namespace, args_dict, kwargs):
args = ""
if namespace == "all":
for key in ["region", "bus"]:
args += PMem._check_add_arg(args_dict, key, kwargs, pop=True)
return args
def write_infoblock(self, namespace="", stdout=False, output=None, **kwargs):
"""
Write an infoblock to the specified medium.
:param namespace: Write the infoblock to given namespace
:param stdout: Write the infoblock to stdout if True
:param output: Write the infoblock to the file path specified
:param kwargs:
Example:
pmem.write_infoblock(namespace=ns_name, align=align,
size=size, mode='devdax')
:return: True if command succeeds
:rtype: bool
:raise: :class:`PMemException`, if command fails.
"""
if not (namespace or stdout or output):
raise PMemException("Specify at least one output medium")
args_dict = {
"region": "-r %s",
"bus": "-b %s",
"mode": "-m %s",
"memmap": "-M %s",
"size": "-s %s",
"align": "-a %s",
"uuid": "-u %s",
"parent_uuid": "-p %",
"offset": "-O %s",
}
if namespace:
args = namespace
elif stdout:
args = "-c"
elif output:
args = f"-o {output}"
args += self._filter_ns_infoblock(namespace, args_dict, kwargs)
args += f" {args_dict['mode']}" % kwargs.pop("mode", "fsdax")
args += f" {args_dict['memmap']}" % kwargs.pop("memmap", "dev")
for key, value in kwargs.items():
if not value:
continue
if key in args_dict:
args += f" {args_dict[key]}" % value
else:
raise PMemException("Input not supported for write-infoblock")
write_cmd = f"{self.ndctl} write-infoblock {args}"
if process.system(write_cmd, shell=True, ignore_status=True):
raise PMemException("write-infoblock command failed")
return True
def read_infoblock(self, namespace="", inp_file="", **kwargs):
"""
Read an infoblock from the specified medium
:param namespace: Read the infoblock from given namespace
:param inp_file: Input file to read the infoblock from
:param kwargs:
Example:
self.plib.read_infoblock(namespace=ns_name, json_form=True)
:return: By default return list of json objects, if json_form is True
Return as raw data, if json_form is False
Return file path if op_file is specified
:raise: :class:`PMemException`, if command fails.
"""
if not (namespace or inp_file):
raise PMemException("Namespace or input file must be specified")
args_dict = {"region": "-r %s", "bus": "-b %s", "op_file": "-o %s"}
if namespace:
args = namespace
elif inp_file:
args = f"-i {inp_file}"
args += self._filter_ns_infoblock(namespace, args_dict, kwargs)
args += self._check_add_arg(args_dict, "op_file", kwargs)
json_form = kwargs.pop("json_form", True)
verify = kwargs.pop("verify", False)
if verify:
args += " -V"
if json_form:
args += " -j"
read_cmd = f"{self.ndctl} read-infoblock {args}"
ret = process.run(read_cmd, shell=True, ignore_status=True)
if ret.exit_status:
raise PMemException("read-infoblock command failed")
if self._check_arg("op_file", kwargs):
return kwargs.get("op_file")
read_op = ret.stdout.decode()
if json_form:
read_op = json.loads(read_op)
return read_op