torvalds/linux

View on GitHub
tools/testing/selftests/drivers/net/mlxsw/sharedbuffer_configuration.py

Summary

Maintainability
F
4 days
Test Coverage
#!/usr/bin/env python
# SPDX-License-Identifier: GPL-2.0

import subprocess
import json as j
import random


class SkipTest(Exception):
    pass


class RandomValuePicker:
    """
    Class for storing shared buffer configuration. Can handle 3 different
    objects, pool, tcbind and portpool. Provide an interface to get random
    values for a specific object type as the follow:
      1. Pool:
         - random size

      2. TcBind:
         - random pool number
         - random threshold

      3. PortPool:
         - random threshold
    """
    def __init__(self, pools):
        self._pools = []
        for pool in pools:
            self._pools.append(pool)

    def _cell_size(self):
        return self._pools[0]["cell_size"]

    def _get_static_size(self, th):
        # For threshold of 16, this works out to be about 12MB on Spectrum-1,
        # and about 17MB on Spectrum-2.
        return th * 8000 * self._cell_size()

    def _get_size(self):
        return self._get_static_size(16)

    def _get_thtype(self):
        return "static"

    def _get_th(self, pool):
        # Threshold value could be any integer between 3 to 16
        th = random.randint(3, 16)
        if pool["thtype"] == "dynamic":
            return th
        else:
            return self._get_static_size(th)

    def _get_pool(self, direction):
        ing_pools = []
        egr_pools = []
        for pool in self._pools:
            if pool["type"] == "ingress":
                ing_pools.append(pool)
            else:
                egr_pools.append(pool)
        if direction == "ingress":
            arr = ing_pools
        else:
            arr = egr_pools
        return arr[random.randint(0, len(arr) - 1)]

    def get_value(self, objid):
        if isinstance(objid, Pool):
            if objid["pool"] in [4, 8, 9, 10]:
                # The threshold type of pools 4, 8, 9 and 10 cannot be changed
                raise SkipTest()
            else:
                return (self._get_size(), self._get_thtype())
        if isinstance(objid, TcBind):
            if objid["tc"] >= 8:
                # Multicast TCs cannot be changed
                raise SkipTest()
            else:
                pool = self._get_pool(objid["type"])
                th = self._get_th(pool)
                pool_n = pool["pool"]
                return (pool_n, th)
        if isinstance(objid, PortPool):
            pool_n = objid["pool"]
            pool = self._pools[pool_n]
            assert pool["pool"] == pool_n
            th = self._get_th(pool)
            return (th,)


class RecordValuePickerException(Exception):
    pass


class RecordValuePicker:
    """
    Class for storing shared buffer configuration. Can handle 2 different
    objects, pool and tcbind. Provide an interface to get the stored values per
    object type.
    """
    def __init__(self, objlist):
        self._recs = []
        for item in objlist:
            self._recs.append({"objid": item, "value": item.var_tuple()})

    def get_value(self, objid):
        if isinstance(objid, Pool) and objid["pool"] in [4, 8, 9, 10]:
            # The threshold type of pools 4, 8, 9 and 10 cannot be changed
            raise SkipTest()
        if isinstance(objid, TcBind) and objid["tc"] >= 8:
            # Multicast TCs cannot be changed
            raise SkipTest()
        for rec in self._recs:
            if rec["objid"].weak_eq(objid):
                return rec["value"]
        raise RecordValuePickerException()


def run_cmd(cmd, json=False):
    out = subprocess.check_output(cmd, shell=True)
    if json:
        return j.loads(out)
    return out


def run_json_cmd(cmd):
    return run_cmd(cmd, json=True)


def log_test(test_name, err_msg=None):
    if err_msg:
        print("\t%s" % err_msg)
        print("TEST: %-80s  [FAIL]" % test_name)
    else:
        print("TEST: %-80s  [ OK ]" % test_name)


class CommonItem(dict):
    varitems = []

    def var_tuple(self):
        ret = []
        self.varitems.sort()
        for key in self.varitems:
            ret.append(self[key])
        return tuple(ret)

    def weak_eq(self, other):
        for key in self:
            if key in self.varitems:
                continue
            if self[key] != other[key]:
                return False
        return True


class CommonList(list):
    def get_by(self, by_obj):
        for item in self:
            if item.weak_eq(by_obj):
                return item
        return None

    def del_by(self, by_obj):
        for item in self:
            if item.weak_eq(by_obj):
                self.remove(item)


class Pool(CommonItem):
    varitems = ["size", "thtype"]

    def dl_set(self, dlname, size, thtype):
        run_cmd("devlink sb pool set {} sb {} pool {} size {} thtype {}".format(dlname, self["sb"],
                                                                                self["pool"],
                                                                                size, thtype))


class PoolList(CommonList):
    pass


def get_pools(dlname, direction=None):
    d = run_json_cmd("devlink sb pool show -j")
    pools = PoolList()
    for pooldict in d["pool"][dlname]:
        if not direction or direction == pooldict["type"]:
            pools.append(Pool(pooldict))
    return pools


def do_check_pools(dlname, pools, vp):
    for pool in pools:
        pre_pools = get_pools(dlname)
        try:
            (size, thtype) = vp.get_value(pool)
        except SkipTest:
            continue
        pool.dl_set(dlname, size, thtype)
        post_pools = get_pools(dlname)
        pool = post_pools.get_by(pool)

        err_msg = None
        if pool["size"] != size:
            err_msg = "Incorrect pool size (got {}, expected {})".format(pool["size"], size)
        if pool["thtype"] != thtype:
            err_msg = "Incorrect pool threshold type (got {}, expected {})".format(pool["thtype"], thtype)

        pre_pools.del_by(pool)
        post_pools.del_by(pool)
        if pre_pools != post_pools:
            err_msg = "Other pool setup changed as well"
        log_test("pool {} of sb {} set verification".format(pool["pool"],
                                                            pool["sb"]), err_msg)


def check_pools(dlname, pools):
    # Save defaults
    record_vp = RecordValuePicker(pools)

    # For each pool, set random size and static threshold type
    do_check_pools(dlname, pools, RandomValuePicker(pools))

    # Restore defaults
    do_check_pools(dlname, pools, record_vp)


class TcBind(CommonItem):
    varitems = ["pool", "threshold"]

    def __init__(self, port, d):
        super(TcBind, self).__init__(d)
        self["dlportname"] = port.name

    def dl_set(self, pool, th):
        run_cmd("devlink sb tc bind set {} sb {} tc {} type {} pool {} th {}".format(self["dlportname"],
                                                                                     self["sb"],
                                                                                     self["tc"],
                                                                                     self["type"],
                                                                                     pool, th))


class TcBindList(CommonList):
    pass


def get_tcbinds(ports, verify_existence=False):
    d = run_json_cmd("devlink sb tc bind show -j -n")
    tcbinds = TcBindList()
    for port in ports:
        err_msg = None
        if port.name not in d["tc_bind"] or len(d["tc_bind"][port.name]) == 0:
            err_msg = "No tc bind for port"
        else:
            for tcbinddict in d["tc_bind"][port.name]:
                tcbinds.append(TcBind(port, tcbinddict))
        if verify_existence:
            log_test("tc bind existence for port {} verification".format(port.name), err_msg)
    return tcbinds


def do_check_tcbind(ports, tcbinds, vp):
    for tcbind in tcbinds:
        pre_tcbinds = get_tcbinds(ports)
        try:
            (pool, th) = vp.get_value(tcbind)
        except SkipTest:
            continue
        tcbind.dl_set(pool, th)
        post_tcbinds = get_tcbinds(ports)
        tcbind = post_tcbinds.get_by(tcbind)

        err_msg = None
        if tcbind["pool"] != pool:
            err_msg = "Incorrect pool (got {}, expected {})".format(tcbind["pool"], pool)
        if tcbind["threshold"] != th:
            err_msg = "Incorrect threshold (got {}, expected {})".format(tcbind["threshold"], th)

        pre_tcbinds.del_by(tcbind)
        post_tcbinds.del_by(tcbind)
        if pre_tcbinds != post_tcbinds:
            err_msg = "Other tc bind setup changed as well"
        log_test("tc bind {}-{} of sb {} set verification".format(tcbind["dlportname"],
                                                                  tcbind["tc"],
                                                                  tcbind["sb"]), err_msg)


def check_tcbind(dlname, ports, pools):
    tcbinds = get_tcbinds(ports, verify_existence=True)

    # Save defaults
    record_vp = RecordValuePicker(tcbinds)

    # Bind each port and unicast TC (TCs < 8) to a random pool and a random
    # threshold
    do_check_tcbind(ports, tcbinds, RandomValuePicker(pools))

    # Restore defaults
    do_check_tcbind(ports, tcbinds, record_vp)


class PortPool(CommonItem):
    varitems = ["threshold"]

    def __init__(self, port, d):
        super(PortPool, self).__init__(d)
        self["dlportname"] = port.name

    def dl_set(self, th):
        run_cmd("devlink sb port pool set {} sb {} pool {} th {}".format(self["dlportname"],
                                                                         self["sb"],
                                                                         self["pool"], th))


class PortPoolList(CommonList):
    pass


def get_portpools(ports, verify_existence=False):
    d = run_json_cmd("devlink sb port pool -j -n")
    portpools = PortPoolList()
    for port in ports:
        err_msg = None
        if port.name not in d["port_pool"] or len(d["port_pool"][port.name]) == 0:
            err_msg = "No port pool for port"
        else:
            for portpooldict in d["port_pool"][port.name]:
                portpools.append(PortPool(port, portpooldict))
        if verify_existence:
            log_test("port pool existence for port {} verification".format(port.name), err_msg)
    return portpools


def do_check_portpool(ports, portpools, vp):
    for portpool in portpools:
        pre_portpools = get_portpools(ports)
        (th,) = vp.get_value(portpool)
        portpool.dl_set(th)
        post_portpools = get_portpools(ports)
        portpool = post_portpools.get_by(portpool)

        err_msg = None
        if portpool["threshold"] != th:
            err_msg = "Incorrect threshold (got {}, expected {})".format(portpool["threshold"], th)

        pre_portpools.del_by(portpool)
        post_portpools.del_by(portpool)
        if pre_portpools != post_portpools:
            err_msg = "Other port pool setup changed as well"
        log_test("port pool {}-{} of sb {} set verification".format(portpool["dlportname"],
                                                                    portpool["pool"],
                                                                    portpool["sb"]), err_msg)


def check_portpool(dlname, ports, pools):
    portpools = get_portpools(ports, verify_existence=True)

    # Save defaults
    record_vp = RecordValuePicker(portpools)

    # For each port pool, set a random threshold
    do_check_portpool(ports, portpools, RandomValuePicker(pools))

    # Restore defaults
    do_check_portpool(ports, portpools, record_vp)


class Port:
    def __init__(self, name):
        self.name = name


class PortList(list):
    pass


def get_ports(dlname):
    d = run_json_cmd("devlink port show -j")
    ports = PortList()
    for name in d["port"]:
        if name.find(dlname) == 0 and d["port"][name]["flavour"] == "physical":
            ports.append(Port(name))
    return ports


def get_device():
    devices_info = run_json_cmd("devlink -j dev info")["info"]
    for d in devices_info:
        if "mlxsw_spectrum" in devices_info[d]["driver"]:
            return d
    return None


class UnavailableDevlinkNameException(Exception):
    pass


def test_sb_configuration():
    # Use static seed
    random.seed(0)

    dlname = get_device()
    if not dlname:
        raise UnavailableDevlinkNameException()

    ports = get_ports(dlname)
    pools = get_pools(dlname)

    check_pools(dlname, pools)
    check_tcbind(dlname, ports, pools)
    check_portpool(dlname, ports, pools)


test_sb_configuration()