avocado-framework/avocado

View on GitHub
avocado/utils/data_structures.py

Summary

Maintainability
C
1 day
Test Coverage
B
89%
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2014
#            IBM, 2023
#
# Authors: Ruda Moura <rmoura@redhat.com>
#          Lucas Meneghel Rodrigues <lmr@redhat.com>
#          Harish S <harisrir@linux.vnet.ibm.com>
#          Maram Srimannarayana Murthy <Maram.Srimannarayana.Murthy@ibm.com>
#

"""
This module contains handy classes that can be used inside
avocado core code or plugins.
"""


import math
import sys


class InvalidDataSize(ValueError):
    """
    Signals that the value given to :class:`DataSize` is not valid.
    """


def ordered_list_unique(object_list):
    """
    Returns an unique list of objects, with their original order preserved
    """
    seen = set()
    seen_add = seen.add
    return [x for x in object_list if not (x in seen or seen_add(x))]


def geometric_mean(values):
    """
    Evaluates the geometric mean for a list of numeric values.
    This implementation is slower but allows unlimited number of values.
    :param values: List with values.
    :return: Single value representing the geometric mean for the list values.
    :see: http://en.wikipedia.org/wiki/Geometric_mean
    """
    try:
        values = [int(value) for value in values]
    except ValueError:
        raise ValueError(f"Invalid inputs {values}. Provide valid inputs")
    no_values = len(values)
    if no_values == 0:
        return None
    return math.exp(sum(math.log(number) for number in values) / no_values)


def compare_matrices(matrix1, matrix2, threshold=0.05):
    """
    Compare 2 matrices nxm and return a matrix nxm with comparison data and
    stats. When the first columns match, they are considered as header and
    included in the results intact.

    :param matrix1: Reference Matrix of floats; first column could be header.
    :param matrix2: Matrix that will be compared; first column could be header
    :param threshold: Any difference greater than this percent threshold will
                      be reported.
    :return: Matrix with the difference in comparison, number of improvements,
             number of regressions, total number of comparisons.
    """
    improvements = 0
    regressions = 0
    same = 0
    new_matrix = []

    for line1, line2 in zip(matrix1, matrix2):
        new_line = []
        elements = iter(zip(line1, line2))
        try:
            element1, element2 = next(elements)
        except StopIteration:  # no data in this row
            new_matrix.append(new_line)
            continue
        if element1 == element2:  # this column contains header
            new_line.append(element1)
            try:
                element1, element2 = next(elements)
            except StopIteration:
                new_matrix.append(new_line)
                continue
        while True:
            try:
                ratio = float(element2) / float(element1)
            except ZeroDivisionError:  # For 0s, allow exact match or error
                if float(element2) == 0:
                    new_line.append(".")
                    same += 1
                else:
                    new_line.append(f"error_{element2}/{element1}")
                    improvements += 1
                try:
                    element1, element2 = next(elements)
                except StopIteration:
                    break
                continue
            if ratio < (1 - threshold):  # handling regression
                regressions += 1
                new_line.append(100 * ratio - 100)
            elif ratio > (1 + threshold):  # handling improvements
                improvements += 1
                new_line.append(f"+{100 * ratio - 100:.6g}")
            else:
                same += 1
                new_line.append(".")
            try:
                element1, element2 = next(elements)
            except StopIteration:
                break
        new_matrix.append(new_line)

    total = improvements + regressions + same
    return (new_matrix, improvements, regressions, total)


def comma_separated_ranges_to_list(string):
    """
    Provides a list from comma separated ranges

    :param string: string of comma separated range
    :return list: list of integer values in comma separated range
    """
    values = []
    for range_str in string.split(","):
        if "-" in range_str:
            start, end = range_str.split("-")
            values.extend(range(int(start), int(end) + 1))
        else:
            values.append(int(range_str))
    return values


def recursive_compare_dict(dict1, dict2, level="DictKey", diff_btw_dict=None):
    """
    Difference between two dictionaries are returned
    Dict values can be a dictionary, list and value

    :rtype: list or None
    """
    if isinstance(dict1, dict) and isinstance(dict2, dict):
        if dict1.keys() != dict2.keys():
            set1 = set(dict1.keys())
            set2 = set(dict2.keys())
            diff_btw_dict.append(f"{level} + {set1-set2} - {set2-set1}")
            common_keys = set1 & set2
        else:
            common_keys = set(dict1.keys())
        for k in common_keys:
            recursive_compare_dict(
                dict1[k], dict2[k], level=f"{level}.{k}", diff_btw_dict=diff_btw_dict
            )
        return diff_btw_dict
    elif isinstance(dict1, list) and isinstance(dict2, list):
        if len(dict1) != len(dict2):
            diff_btw_dict.append(f"{level} + {len(dict1)} - {len(dict2)}")
        common_len = min(len(dict1), len(dict2))
        for i in range(common_len):
            recursive_compare_dict(
                dict1[i],
                dict2[i],
                level=f"{level}.{dict1[i]}",
                diff_btw_dict=diff_btw_dict,
            )
    else:
        if dict1 != dict2:
            diff_btw_dict.append(f"{level} - dict1 value:{dict1}, dict2 value:{dict2}")


class Borg:
    """
    Multiple instances of this class will share the same state.

    This is considered a better design pattern in Python than
    more popular patterns, such as the Singleton. Inspired by
    Alex Martelli's article mentioned below:

    :see: http://www.aleax.it/5ep.html
    """

    __shared_state = {}

    def __init__(self):
        self.__dict__ = self.__shared_state


class LazyProperty:
    """
    Lazily instantiated property.

    Use this decorator when you want to set a property that will only be
    evaluated the first time it's accessed. Inspired by the discussion in
    the Stack Overflow thread below:

    :see: http://stackoverflow.com/questions/15226721/
    """

    def __init__(self, f_get):
        self.f_get = f_get
        self.func_name = f_get.__name__

    def __get__(self, obj, cls):
        if obj is None:
            return None
        value = self.f_get(obj)
        setattr(obj, self.func_name, value)
        return value


class CallbackRegister:
    """
    Registers pickable functions to be executed later.
    """

    def __init__(self, name, log):
        """
        :param name: Human readable identifier of this register
        """
        self._name = name
        self._items = []
        self._log = log

    def register(self, func, args, kwargs, once=False):
        """
        Register function/args to be called on self.destroy()
        :param func: Pickable function
        :param args: Pickable positional arguments
        :param kwargs: Pickable keyword arguments
        :param once: Add unique (func,args,kwargs) combination only once
        """
        item = (func, args, kwargs)
        if not once or item not in self._items:
            self._items.append(item)

    def unregister(self, func, args, kwargs):
        """
        Unregister (func,args,kwargs) combination
        :param func: Pickable function
        :param args: Pickable positional arguments
        :param kwargs: Pickable keyword arguments
        """
        item = (func, args, kwargs)
        if item in self._items:
            self._items.remove(item)

    def run(self):
        """
        Call all registered function
        """
        while self._items:
            item = self._items.pop()
            try:
                func, args, kwargs = item
                func(*args, **kwargs)
            except:  # Ignore all exceptions pylint: disable=W0702
                self._log.error(
                    "%s failed to destroy %s:\n%s", self._name, item, sys.exc_info()[1]
                )

    def __del__(self):
        """
        :warning: Always call self.run() manually, this is not guaranteed
                  to be executed!
        """
        self.run()


def time_to_seconds(time):
    """
    Convert time in minutes, hours and days to seconds.
    :param time: Time, optionally including the unit (i.e. '10d')
    """
    units = {"s": 1, "m": 60, "h": 3600, "d": 86400}
    if time is not None:
        try:
            unit = time[-1].lower()
            if unit in units:
                mult = units[unit]
                seconds = int(time[:-1]) * mult
            else:
                seconds = int(time)
        except (ValueError, TypeError):
            raise ValueError(
                f"Invalid value '{time}' for time. Use a string "
                f"with the number and optionally the time unit "
                f"(s, m, h or d)."
            )
    else:
        seconds = 0
    return seconds


class DataSize:
    """
    Data Size object with builtin unit-converted attributes.

    :param data: Data size plus optional unit string. i.e. '10m'. No
                 unit string means the data size is in bytes.
    :type data: str
    """

    __slots__ = ["_value", "_unit"]

    MULTIPLIERS = {
        "b": 1,  # 2**0
        "k": 1024,  # 2**10
        "m": 1048576,  # 2**20
        "g": 1073741824,  # 2**30
        "t": 1099511627776,
    }  # 2**40

    def __init__(self, data):
        try:
            norm_size = data.strip().lower()
            last = norm_size[-1]
            if last.isdigit():
                self._value = int(norm_size)
                self._unit = "b"
            elif last in self.MULTIPLIERS:
                self._value = int(norm_size[:-1])
                self._unit = last
            else:
                raise ValueError

            if self._value < 0:
                raise ValueError

        except ValueError:
            raise InvalidDataSize(
                'String not in size + unit format (i.e. "10M", "100k", ...)'
            )

    @property
    def value(self):
        return self._value

    @property
    def unit(self):
        return self._unit

    @property
    def b(self):
        return self._value * self.MULTIPLIERS[self._unit]

    @property
    def k(self):
        return int(self._value * self.MULTIPLIERS[self._unit] / self.MULTIPLIERS["k"])

    @property
    def m(self):
        return int(self._value * self.MULTIPLIERS[self._unit] / self.MULTIPLIERS["m"])

    @property
    def g(self):
        return int(self._value * self.MULTIPLIERS[self._unit] / self.MULTIPLIERS["g"])

    @property
    def t(self):
        return int(self._value * self.MULTIPLIERS[self._unit] / self.MULTIPLIERS["t"])