avocado-framework/avocado

View on GitHub
avocado/utils/network/interfaces.py

Summary

Maintainability
D
2 days
Test Coverage
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
#
# Copyright: 2019-2020 IBM
# Copyright: 2019-2020 Red Hat Inc.
# Authors : Beraldo Leal <bleal@redhat.com>
#         : Praveen K Pandey <praveen@linux.vnet.ibm.com>
#         : Vaishnavi Bhat <vaishnavi@linux.vnet.ibm.com>

import json
import logging
import os
import re
import shutil
import subprocess
from ipaddress import IPv4Address, ip_interface

from avocado.utils import process
from avocado.utils.distro import detect as distro_detect
from avocado.utils.network.common import run_command
from avocado.utils.network.exceptions import NWException
from avocado.utils.wait import wait_for

LOG = logging.getLogger(__name__)


class NetworkInterface:
    """
    This class represents a network card interface (NIC).

    An "NetworkInterface" is attached to some host. This could be an instance
    of LocalHost or RemoteHost.  If a RemoteHost then all commands will be
    executed on a remote_session (host.remote_session). Otherwise will be
    executed locally.

    Here you will find a few methods to perform basic operations on a NIC.
    """

    def __init__(self, if_name, host, if_type="Ethernet"):
        self.name = if_name
        self.if_type = if_type
        self.host = host
        self.distro_is_rhel9 = False

    @property
    def config_filename(self):
        current_distro = distro_detect()
        if current_distro.name in ["rhel", "fedora"]:
            if self.distro_is_rhel9:
                path = "/etc/NetworkManager/system-connections"
            else:
                path = "/etc/sysconfig/network-scripts"
        elif current_distro.name == "SuSE":
            path = "/etc/sysconfig/network"
        else:
            msg = "Distro not supported by API. Could not get interface filename."
            raise NWException(msg)
        if self.distro_is_rhel9:
            return f"{path}/{self.name}.nmconnection"
        else:
            return f"{path}/ifcfg-{self.name}"

    @property
    def config_file_path(self):
        current_distro = distro_detect()
        if current_distro.name in ["rhel", "fedora"]:
            if self.distro_is_rhel9:
                return "/etc/NetworkManager/system-connections"
            else:
                return "/etc/sysconfig/network-scripts"
        elif current_distro.name == "SuSE":
            return "/etc/sysconfig/network"
        else:
            msg = "Distro not supported by API. Could not get interface filename."
            LOG.error(msg)

    @property
    def slave_config_filename(self):
        try:
            slave_dict = self._get_bondinterface_details()
            if self.distro_is_rhel9:
                return [
                    f"{self.config_file_path}/{slave}.nmconnection"
                    for slave in slave_dict["slaves"]
                ]
            else:
                return [
                    f"{self.config_file_path}/ifcfg-{slave}"
                    for slave in slave_dict["slaves"]
                ]
        except Exception:
            msg = "Slave config filename not available"
            LOG.debug(msg)
            return

    def _get_interface_details(self, version=None):
        cmd = f"ip -j link show {self.name}"
        if version:
            cmd = f"ip -{version} -j address show {self.name}"
        output = run_command(cmd, self.host)
        try:
            result = json.loads(output)
            for item in result:
                if item.get("ifname") == self.name:
                    return item
            raise NWException("Interface not found")
        except (NWException, json.JSONDecodeError):
            msg = f"Unable to get the details of interface {self.name}"
            LOG.error(msg)
            raise NWException(msg)

    def _get_bondinterface_details(self):
        cmd = (
            f"cat /sys/class/net/{self.name}/bonding/mode "
            f"/sys/class/net/{self.name}/bonding/slaves"
        )
        try:
            mode, slaves = run_command(cmd, self.host).splitlines()
            return {"mode": mode.split(), "slaves": slaves.split()}
        except Exception:
            raise NWException(f"Slave interface not found for " f"the bond {self.name}")

    def _move_file_to_backup(self, filename, ignore_missing=True):
        destination = f"{filename}.backup"
        if os.path.exists(filename):
            shutil.move(filename, destination)
        else:
            if not ignore_missing:
                raise NWException(f"{self.name} interface not available")

    def _write_to_file(self, filename, values):
        self._move_file_to_backup(filename)

        with open(filename, "w+") as fp:  # pylint: disable=W1514
            for key, value in values.items():
                fp.write(f"{key}={value}\n")

    def set_hwaddr(self, hwaddr):
        """Sets a Hardware Address (MAC Address) to the interface.

        This method will try to set a new hwaddr to this interface, if
        fails it will raise a NWException.

        You must have sudo permissions to run this method on a host.

        :param hwaddr: Hardware Address (Mac Address)
        """
        cmd = f"ip link set dev {self.name} address {hwaddr}"
        try:
            run_command(cmd, self.self.host, sudo=True)
        except Exception as ex:
            raise NWException(f"Adding hw address fails: {ex}")

    def add_ipaddr(self, ipaddr, netmask):
        """Add an IP Address (with netmask) to the interface.

        This method will try to add a new ipaddr/netmask this interface, if
        fails it will raise a NWException.

        You must have sudo permissions to run this method on a host.

        :param ipaddr: IP Address
        :param netmask: Network mask
        """

        ip = ip_interface(f"{ipaddr}/{netmask}")
        cmd = f"ip addr add {ip.compressed} dev {self.name}"
        try:
            run_command(cmd, self.host, sudo=True)
        except Exception as ex:
            raise NWException(f"Failed to add address {ex}")

    @property
    def vlans(self):
        """Return all interface's VLAN.

        This is a dict were key is the VLAN number and the value is the name of
        the VLAN interface.

        rtype: dict
        """
        vlans = {}
        if not os.path.exists("/proc/net/vlan/config"):
            return vlans
        with open("/proc/net/vlan/config", encoding="utf-8") as vlan_config_file:
            for line in vlan_config_file:
                # entry is formatted as "vlan_name | vlan_id | parent_device"
                line = "".join(line.split())
                if line.endswith(self.name):
                    line = line.split("|")
                    vlans[line[1]] = line[0]
        return vlans

    def add_vlan_tag(self, vlan_num, vlan_name=None):
        """Configure 802.1Q VLAN tagging to the interface.

        This method will attempt to add a VLAN tag to this interface. If it
        fails, the method will raise a NWException.

        :param vlan_num: VLAN ID
        :param vlan_name: option to name VLAN interface, by default it is named
                          <interface_name>.<vlan_num>
        """

        vlan_name = vlan_name or f"{self.name}.{vlan_num}"
        cmd = (
            f"ip link add link {self.name} name {vlan_name} " f"type vlan id {vlan_num}"
        )
        try:
            run_command(cmd, self.host, sudo=True)
        except Exception as ex:
            raise NWException(f"Failed to add VLAN tag: {ex}")

    def remove_vlan_by_tag(self, vlan_num):
        """Remove the VLAN of the interface by tag number.

        This method will try to remove the VLAN tag of this interface. If it fails,
        the method will raise a NWException.

        :param vlan_num: VLAN ID
        :return: True or False, True if it found the VLAN interface and removed
                 it successfully, otherwise it will return False.
        """
        if str(vlan_num) in self.vlans:
            vlan_name = self.vlans[str(vlan_num)]
        else:
            return False
        cmd = f"ip link delete {vlan_name}"

        try:
            run_command(cmd, self.host, sudo=True)
            return True
        except Exception as ex:
            raise NWException(f"Failed to remove VLAN interface: {ex}")

    def remove_all_vlans(self):
        """Remove all VLANs of this interface.

        This method will remove all the VLAN interfaces associated by the
        interface. If it fails, the method will raise a NWException.
        """
        try:
            for v in self.vlans.values():
                cmd = f"ip link delete {v}"
                run_command(cmd, self.host, sudo=True)
        except Exception as ex:
            raise NWException(f"Failed to remove VLAN interface: {ex}")

    def bring_down(self):
        """Shutdown the interface.

        This will shutdown the interface link. Be careful, you might lost
        connection to the host.

        You must have sudo permissions to run this method on a host.
        """

        cmd = f"ip link set {self.name} down"
        try:
            run_command(cmd, self.host, sudo=True)
        except Exception as ex:
            raise NWException(f"Failed to bring down: {ex}")

    def bring_up(self):
        """ "Wake-up the interface.

        This will wake-up the interface link.

        You must have sudo permissions to run this method on a host.
        """
        cmd = f"ip link set {self.name} up"
        try:
            run_command(cmd, self.host, sudo=True)
        except Exception as ex:
            raise NWException(f"Failed to bring up: {ex}")

    def is_admin_link_up(self):
        """Check the admin link state is up or not.

        :return: True or False, True if network interface state is 'UP'
                 otherwise will return False.
        """
        try:
            if "UP" in self._get_interface_details().get("flags"):
                return True
        except (NWException, IndexError):
            raise NWException("Could not get Administrative link state.")
        return False

    def is_operational_link_up(self):
        """Check Operational link state is up or not.

        :return: True or False. True if operational link state is LOWER_UP,
                 otherwise will return False.
        """
        try:
            if "LOWER_UP" in self._get_interface_details().get("flags"):
                return True
        except (NWException, IndexError):
            raise NWException("Could not get operational link state.")
        return False

    def is_link_up(self):
        """Check if the interface is up or not.

        :return: True or False. True if admin link state and operational
                 link state is up otherwise will return False.
        """
        return self.is_admin_link_up() and self.is_operational_link_up()

    def get_ipaddrs(self, version=4):
        """Get the IP addresses from a network interface.

        Interfaces can hold multiple IP addresses. This method will return a
        list with all addresses on this interface.

        :param version: Address Family Version (4 or 6). This must be a integer
                        and default is 4.
        :return: IP address as string.
        """
        if version not in [4, 6]:
            raise NWException(f"Version {version} not supported")

        try:
            details = self._get_interface_details(version)
            addr_info = details.get("addr_info")
            if addr_info:
                return [x.get("local") for x in addr_info]
        except (NWException, IndexError):
            msg = f"Could not get ip addresses for {self.name}"
            LOG.debug(msg)
            return []

    def get_hwaddr(self):
        """Get the Hardware Address (MAC) of this interface.

        This method will try to get the address and if fails it will raise a
        NWException.
        """
        cmd = f"cat /sys/class/net/{self.name}/address"
        try:
            return run_command(cmd, self.host)
        except Exception as ex:
            raise NWException(f"Failed to get hw address: {ex}")

    def get_mtu(self):
        """Return the current MTU value of this interface.

        This method will try to get the current MTU value, if fails will
        raise a NWException.
        """
        try:
            return self._get_interface_details().get("mtu")
        except (NWException, IndexError):
            raise NWException("Could not get MUT value.")

    def ping_check(self, peer_ip, count=2, options=None):
        """This method will try to ping a peer address (IPv4 or IPv6).

        You should provide a IPv4 or IPV6 that would like to ping. This
        method will try to ping the peer and if fails it will raise a
        NWException.

        :param peer_ip: Peer IP address (IPv4 or IPv6)
        :param count: How many packets to send. Default is 2
        :param options: ping command options. Default is None
        """
        cmd = f"ping -I {self.name} {peer_ip} -c {count}"
        if options is not None:
            cmd = f"{cmd} {options}"
        try:
            run_command(cmd, self.host)
        except Exception as ex:
            raise NWException(f"Failed to ping: {ex}")

    def save(self, ipaddr, netmask):
        """Save current interface IP Address to the system configuration file.

        If the ipaddr is valid (currently being used by the interface)
        this will try to save the current settings into /etc/. This
        check is necessary to avoid inconsistency. Before save, you
        should add_ipaddr, first.

        Currently, only RHEL, Fedora and SuSE are supported. And this
        will create a backup file of your current configuration if
        found.

        :param ipaddr : IP Address which need to configure for interface
        :param netmask: Network mask which is associated to the provided IP
        """
        if ipaddr not in self.get_ipaddrs():
            msg = (
                "ipaddr not configured on interface. To avoid "
                "inconsistency, please add the ipaddr first."
            )
            raise NWException(msg)

        current_distro = distro_detect()
        if current_distro.name == "rhel" and current_distro.version == "9":
            self.distro_is_rhel9 = "rhel9"

        filename = f"ifcfg-{self.name}"
        prefix = self.netmask_to_cidr(netmask)
        if current_distro.name in ["rhel", "fedora"]:
            if self.distro_is_rhel9:
                filename = f"{self.name}.nmconnection"
                path = "/etc/NetworkManager/system-connections"
            else:
                path = "/etc/sysconfig/network-scripts"
        elif current_distro.name == "SuSE":
            path = "/etc/sysconfig/network"
        else:
            msg = "Distro not supported by API. Could not save ipaddr."
            raise NWException(msg)

        if self.distro_is_rhel9:
            ifcfg_dict = ""
            if os.path.exists(f"{path}/{filename}") is False:
                run_command(
                    f"nmcli connection add con-name {self.name} ifname {self.name} type ethernet ipv4.address {ipaddr}/{prefix}",
                    self.host,
                )
            self._move_file_to_backup(f"{path}/{filename}")
            if os.path.exists(f"{path}/{filename}.backup"):
                destination = f"{path}/{filename}"
                shutil.copy(f"{path}/{filename}.backup", destination)
            run_command(
                f"nmcli c mod id {self.name} ipv4.method manual ipv4.address {ipaddr}/{prefix}",
                self.host,
            )
            run_command(f"nmcli connection up {self.name}", self.host)
        else:
            ifcfg_dict = {
                "TYPE": self.if_type,
                "BOOTPROTO": "static",
                "NAME": self.name,
                "DEVICE": self.name,
                "ONBOOT": "yes",
                "IPADDR": ipaddr,
                "NETMASK": netmask,
                "IPV6INIT": "yes",
                "IPV6_AUTOCONF": "yes",
                "IPV6_DEFROUTE": "yes",
            }
            if current_distro.name == "SuSE":
                ifcfg_dict = {
                    "IPADDR": f"{ipaddr}/{prefix}",
                    "BOOTPROTO": "static",
                    "STARTMODE": "auto",
                }
            self._write_to_file(f"{path}/{filename}", ifcfg_dict)

        if self.if_type == "Bond":
            bond_dict = self._get_bondinterface_details()
            if self.distro_is_rhel9:
                if os.path.exists(f"{path}/{filename}") is False:
                    run_command(
                        f"nmcli connection add con-name {self.name} ifname {self.name} type ethernet ipv4.address {ipaddr}/{prefix}",
                        self.host,
                    )
                self._move_file_to_backup(f"{path}/{filename}")
                if os.path.exists(f"{path}/{filename}.backup"):
                    destination = f"{path}/{filename}"
                    shutil.copy(f"{path}/{filename}.backup", destination)
                run_command(
                    f"nmcli c mod id {self.name} ipv4.method manual ipv4.address {ipaddr}/{prefix}",
                    self.host,
                )
                run_command(f"nmcli connection up {self.name}", self.host)
            else:
                ifcfg_dict["BONDING_MASTER"] = "yes"
                ifcfg_slave_dict = {
                    "SLAVE": "yes",
                    "ONBOOT": "yes",
                    "MASTER": self.name,
                }
                if current_distro.name == "SuSE":
                    ifcfg_dict["BONDING_MODULE_OPTS"] = "mode=" + bond_dict["mode"][0]
                    for index, slave in enumerate(bond_dict["slaves"]):
                        bonding_slave = f"BONDING_SLAVE{index}"
                        ifcfg_dict[bonding_slave] = slave
                        ifcfg_slave_dict.update({"NAME": slave, "DEVICE": slave})
                        self._write_to_file(f"{path}/ifcfg-{slave}", ifcfg_slave_dict)
                elif current_distro.name in ["rhel", "fedora"]:
                    ifcfg_dict["BONDING_OPTS"] = "mode=" + bond_dict["mode"][0]
                    for index, slave in enumerate(bond_dict["slaves"]):
                        ifcfg_slave_dict.update(
                            {"NAME": slave, "DEVICE": slave, "TYPE": "Ethernet"}
                        )
                        self._write_to_file(f"{path}/ifcfg-{slave}", ifcfg_slave_dict)
                else:
                    msg = "Distro not supported by API. Could not save ipaddr."
                    raise NWException(msg)

    def set_mtu(self, mtu, timeout=30):
        """Sets a new MTU value to this interface.

        This method will try to set a new MTU value to this interface,
        if fails it will raise a NWException. Also it will wait until
        the Interface is up before returning or until timeout be
        reached.

        You must have sudo permissions to run this method on a host.

        :param mtu:  mtu size that need to be set. This must be an int.
        :param timeout: how many seconds to wait until the interface is
                        up again. Default is 30.
        """
        cmd = f"ip link set {self.name} mtu {mtu}"
        run_command(cmd, self.host, sudo=True)
        wait_for(self.is_link_up, timeout=timeout)
        if int(mtu) != self.get_mtu():
            raise NWException("Failed to set MTU.")

    def remove_ipaddr(self, ipaddr, netmask):
        """Removes an IP address from this interface.

        This method will try to remove the address from this interface
        and if fails it will raise a NWException. Be careful, you can
        lost connection.

        You must have sudo permissions to run this method on a host.
        """
        ip = ip_interface(f"{ipaddr}/{netmask}")
        cmd = f"ip addr del {ip.compressed} dev {self.name}"
        try:
            run_command(cmd, self.host, sudo=True)
        except Exception as ex:
            msg = f"Failed to remove ipaddr. {ex}"
            raise NWException(msg)

    def flush_ipaddr(self):
        """Flush all the IP address for this interface.

        This method will try to flush the ip address from this interface
        and if fails it will raise a NWException. Be careful, you can
        lost connection.

        You must have sudo permissions to run this method on a host.
        """
        cmd = f"ip addr flush dev {self.name}"
        try:
            run_command(cmd, self.host, sudo=True)
        except Exception as ex:
            msg = f"Failed to flush ipaddr. {ex}"
            raise NWException(msg)

    def remove_link(self):
        """Deletes virtual interface link.

        This method will try to delete the virtual device link and the
        interface will no more be listed with 'ip a' and if fails it
        will raise a NWException. Be careful, you can lost connection.

        You must have sudo permissions to run this method on a host.
        """
        cmd = f"ip link del dev {self.name}"
        try:
            run_command(cmd, self.host, sudo=True)
        except Exception as ex:
            msg = f"Failed to delete link. {ex}"
            raise NWException(msg)

    def restore_from_backup(self):
        """Revert interface file from backup.

        This method checks if a backup version  is available for given
        interface then it copies backup file to interface file in /sysfs path.
        """

        backup_file = f"{self.config_filename}.backup"
        if os.path.exists(backup_file):
            shutil.move(backup_file, self.config_filename)
        else:
            raise NWException("Backup file not available, could not restore file.")

    def is_available(self):
        """Check if interface is available.

        This method checks if the interface is available.

        rtype: bool
        """
        cmd = f"ip link show dev {self.name}"
        try:
            run_command(cmd, self.host)
            return True
        except Exception as ex:
            msg = f"Interface {self.name} is not available. {ex}"
            LOG.debug(msg)
            return False

    def is_bond(self):
        """Check if interface is a bonding device.

        This method checks if the interface is a bonding device or not.

        rtype: bool
        """
        cmd = f"cat /proc/net/bonding/{self.name}"
        try:
            run_command(cmd, self.host)
            return True
        except Exception as ex:
            msg = f"{self.name} is not a bond device. {ex}"
            LOG.debug(msg)
            return False

    def is_veth(self):
        """Check if interface is a Virtual Ethernet.

        This method checks if the interface is a Virtual Ethernet or not.

        rtype: bool
        """
        if not os.path.isfile(f"/sys/class/net/{self.name}/device/devspec"):
            raise NWException("Network interface sysfs file does not exists")
        cmd = f"cat /sys/class/net/{self.name}/device/devspec"
        output = run_command(cmd, self.host)
        if "l-lan" in output:
            return True
        return False

    def is_vnic(self):
        """Check if interface is a virtual network.

        This method checks if the interface is a virtual NIC or not.

        rtype: bool
        """
        if not os.path.isfile(f"/sys/class/net/{self.name}/device/devspec"):
            raise NWException("Network interface sysfs file does not exists")
        cmd = f"cat /sys/class/net/{self.name}/device/devspec"
        output = run_command(cmd, self.host)
        if "vnic" in output:
            return True
        return False

    def is_sriov(self):
        """Check if interface is a SRIOV virtual interface.

        This method checks if the interface is SRIOV logical interface or not.

        rtype: bool
        """
        if not os.path.isfile(f"/sys/class/net/{self.name}/device/vpd"):
            raise NWException("Network interface sysfs file does not exists")
        cmd = f"cat /sys/class/net/{self.name}/device/vpd"
        output = run_command(cmd, self.host)
        for vpd in output.split():
            if "VF" in vpd and vpd.endswith("SN"):
                return True
        return False

    def remove_cfg_file(self):
        """
        Remove any config files that is created as a part of the test
        """
        if os.path.isfile(self.config_filename):
            os.remove(self.config_filename)

    def restore_slave_cfg_file(self):
        """
        Restore or delete slave config files.
        """
        if self.if_type != "Bond":
            return
        for slave_config in self.slave_config_filename:
            backup_slave_config = f"{slave_config}.backup"
            try:
                if os.path.exists(backup_slave_config):
                    shutil.move(backup_slave_config, slave_config)
                else:
                    os.remove(slave_config)
            except Exception as ex:
                raise NWException(f"Could not restore the config file {ex}")

    def are_packets_lost(self, peer_ip, options=None, sudo=False):
        """Check packet loss that occurs during ping.

        Function returns True for 0% packet loss and False
        if packet loss occurs.

        :param peer_ip: Peer IP address (IPv4 or IPv6)
        :param options: Type is List. Options such as -c, -f. Default is None
        :param sudo: If sudo permissions are needed. Default is False
        """
        cmd = f"ping -I {self.name} {peer_ip}"
        cmd = f"{cmd} "
        if options is not None:
            for elem in options:
                cmd += f"{elem} "
        try:
            output = run_command(cmd, self.host, sudo=sudo)
            if "0% packet loss" not in output:
                return False
            return True
        except Exception as ex:
            msg = f"Failed to ping. {ex}"
            raise NWException(msg)

    def netmask_to_cidr(self, netmask):
        """Function is used to check the netmask value and convert

        it into short form (mask) of netmask values
        Example : 255.255.255.0 = 24
        255.255.252.0 = 22

        :param netmask: Netmask value example 255.255.255.0
        :return: Returns mask value of given netmask
        """
        return sum(bin(int(bits)).count("1") for bits in netmask.split("."))

    def validate_ipv4_format(self, ip):
        """
        This function validates IPv4 address with following format set.

        1. A string in decimal-dot notation, consisting of four decimal
           integers in the inclusive range 0-255,separated by dots
           (e.g- 192.168.0.1).Each integer represents an octet in the address.
        2. An integer that fits into 32 bits.
        3. An integer packed into a bytes object of length 4.

        And for IP address which are not met above conditions,
        raises AddressValueError and returns False.

        :param ip: IP address
        :type pattern: str
        :return: True when IP address pattern/format matches if not
                 return False
        :rtype: boolean
        """
        try:
            IPv4Address(ip)
        except Exception as ex:
            LOG.debug("Failed to validate IP format %s", ex)
            return False
        return True

    def validate_ipv4_netmask_format(self, netmask):
        """
        This function validates IPv4 Netmask address with following format set.

        1. A string in decimal-dot notation,consisting of four decimal integers
           starting from 255 and octets separated by dots (e.g 255.255.255.0)
        2. An integer packed into a bytes object of length 4

        And for Netmask which are not met above conditions,
        [ eg : 255.0.255.0, 255.255.255, 255.255.255.256, 255.255.255.255.0]
        returns False.

        :param netmask: netmask address
        :type pattern: str
        :return: True when netmask address pattern/format matches if not
                 return False
        :rtype: boolean
        """
        netmask_list = netmask.split(".")
        if len(netmask_list) != 4:
            return False
        for octet in netmask_list:
            num = int(octet)
            if not 0 <= num <= 255:
                return False
        octet_bin = [format(int(i), "08b") for i in netmask_list]
        binary_netmask = ("").join(octet_bin)
        accept_zero_only = False
        first_bit = True
        for symbol in binary_netmask:
            if accept_zero_only and symbol == "1":
                return False
            elif symbol == "0":
                accept_zero_only = True
            if first_bit and symbol == "0":
                return False
            first_bit = False
        return True

    def ping_flood(self, int_name, peer_ip, ping_count):
        """
        Function to start ping to remote machine with "-f" [ flood ] option,
        on given interface.

        Also this function enables to track the live data to determine the
        ping flood failure, in case of failure the program will exit.

        :param int_name: source interface name.
        :param peer_ip: Peer IP address (IPv4 or IPv6)
        :param ping_count: How many ICMP echo packets to send.
        :return: returns True on successful ping flood.
                  returns False on ping flood failure.
        :rtype: boolean
        """
        cmd = f"ping -I {int_name} {peer_ip} -c {ping_count} -f "
        ping_process = subprocess.Popen(
            cmd,
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            universal_newlines=True,
        )
        pattern = r"\.{10}"
        while True:
            char = ping_process.stdout.read(100)
            match = re.search(pattern, char)
            if match:
                ping_process.terminate()
                msg = "ping flood failed to remote machine, Please check the logs"
                LOG.debug(msg)
                return False
            return True
        ping_process.stdout.close()
        ping_process.wait()

    def get_device_IPI_name(self):
        """
        Function to convert IO device name to device_ipi names according to
        "/proc/interrupts" context.
        Ex: vnic@30000009 to vnic-30000009

        :return : A converted Network device according to device_ipi name.
        :rtype : string
        """

        if self.is_vnic():
            cmd = (
                f"cat /sys/class/net/{self.name}/device/devspec | "
                f"awk -F/ '{{print $3}}'"
            )
            interface_type = process.system_output(
                cmd, shell=True, ignore_status=True
            ).decode("utf-8")
            cmd = f"echo {interface_type} | sed 's/@/-/' "
            interface_type = process.system_output(
                cmd, shell=True, ignore_status=True
            ).decode("utf-8")
            return interface_type
        elif self.is_veth():
            return self.name