KarrLab/wc_utils

View on GitHub
wc_utils/util/misc.py

Summary

Maintainability
C
1 day
Test Coverage
C
73%
""" Miscellaneous utilities.

:Author: Jonathan Karr <karr@mssm.edu>
:Author: Arthur Goldberg <Arthur.Goldberg@mssm.edu>
:Date: 2016-11-05
:Copyright: 2016-2018, Karr Lab
:License: MIT
"""

from fractions import Fraction
import collections.abc
import copy
import dataclasses
import math
import os
import pickle
import socket
import sys


def isclass(cls, cls_info):
    """Compares a class with classes in `cls_info`.

    Args:
        cls (:obj:`str`): class
        cls_info (:obj:`class`, :obj:`type`, or :obj:`tuple` of classes and types): class, type, or
            tuple of classes and types

    Returns:
        :obj:`bool`: True if one of the classes in `cls_info` is `cls`.
    """
    return isclass_by_name(most_qual_cls_name(cls), cls_info)


def isclass_by_name(cls_name, cls_info):
    """Compares a class name with the names of the classes in `cls_info`.

    Args:
        cls_name (:obj:`str`): class name
        cls_info (:obj:`class`, :obj:`type`, or :obj:`tuple` of classes and types): class, type, or
            tuple of classes and types

    Returns:
        :obj:`bool`: True if one of the classes in `cls_info` has name `cls_name`.
    """
    if not isinstance(cls_info, tuple):
        cls_info = (cls_info,)

    for a_cls_info in cls_info:
        if isinstance(a_cls_info, tuple):
            if isclass_by_name(cls_name, a_cls_info):
                return True
        elif cls_name == most_qual_cls_name(a_cls_info):
            return True

    return False


def most_qual_cls_name(obj):
    """ Obtain the most qualified class name available for `obj`.

    Since references to classes cannot be sent in messages that leave an address space,
    use the most qualified class name available to compare class values across address spaces.
    Fully qualified class names are available for Python >= 3.3.

    Args:
        obj (:obj:`class`): an object, which may be a class.

    Returns:
        :obj:`str`: the most qualified class name available for `obj`.
    """
    if isinstance(obj, type):
        cls = obj
    else:
        cls = obj.__class__

    if (3, 3) <= sys.version_info:
        return cls.__module__ + '.' + cls.__qualname__
    else:
        return cls.__module__ + '.' + cls.__name__  # pragma: no cover # old Python


def round_direct(value, precision=2):
    """ Convert `value` to rounded string with appended sign indicating the rounding direction.

    Append '+' to indicate that `value` has been rounded down, and '-' to indicate rounding up.
    For example,
    round_direct(3.01, 2) == '3.01'
    round_direct(3.01, 1) == '3.0+'
    round_direct(2.99, 1) == '3.0-'

    This function helps display simulation times that have been slightly increased or decreased to
    control order execution.

    Args:
        value (float): the value to round.
        precision (int): the precision with which to round `value`.

    Returns:
        str: `value` rounded to `precision` places, followed by a sign indicating rounding direction.
    """
    if round(value, precision) == value:
        return str(round(value, precision))
    elif round(value, precision) < value:
        return '{}+'.format(round(value, precision))
    else:   # value < round(value, precision)
        return '{}-'.format(round(value, precision))


def quote(s):
    """ Enclose a string that contains spaces in single quotes, 'like this'

    Args:
        s (:obj:`object`): a string

    Returns:
        :obj:`str`: a string
    """
    s = str(s)
    if ' ' in s:
        return "'{}'".format(s)
    else:
        return s


def obj_to_str(obj, attrs):
    """ Provide a string representation of an object

    Args:
        obj (:obj:`object`): an object
        attrs (:obj:`collections.abc.Iterator`): the names of attributes in `obj` to represent

    Returns:
        :obj:`str`: a string
    """
    rv = ['\nClass: ' + obj.__class__.__name__]
    for attr in attrs:
        if hasattr(obj, attr):
            rv.append("{}: {}".format(attr, str(getattr(obj, attr))))
        else:
            rv.append("{}: --not defined--".format(attr))
    return '\n'.join(rv)


def as_dict(obj):
    """ Provide a dictionary representation of `obj`

    `obj` must define an attribute called `ATTRIBUTES` which iterates over the attributes that
    should be included in the representation.

    Recursively computes `as_dict()` on nested objects that define `ATTRIBUTES`. Warning: calling
    `as_dict` on cyclic networks of objects will cause infinite recursion and stack overflow.

    Returns:
        :obj:`dict`: a representation of `obj` mapping attribute names to values, nested for nested
            objects

    Raises:
        :obj:`ValueError`: `obj` does not define an attribute called `ATTRIBUTES`
    """
    if not hasattr(obj, 'ATTRIBUTES'):
        raise ValueError('obj must define the attribute ATTRIBUTES')
    d = {}
    for attr in obj.ATTRIBUTES:
        contained_obj = getattr(obj, attr)
        if hasattr(contained_obj, 'ATTRIBUTES'):
            d[attr] = as_dict(contained_obj)
        else:
            d[attr] = getattr(obj, attr)
    return d


def internet_connected():
    """ Determine whether the Internet is connected

    Returns:
        :obj:`bool`: return `True` if the internet (actually www.google.com) is accessible, `False` otherwise
    """
    try:
        # connect to the host -- tells us if the host is actually reachable
        socket.create_connection(("www.google.com", 80))
        return True
    except OSError: # pragma: no cover
        pass
    return False    # pragma: no cover


class OrderableNoneType(object):
    """ Type that can be used for sorting in Python 3 in place of :obj:`None` """

    def __lt__(self, other):
        return (other is not self) and (other is not None)

    def __le__(self, other):
        return True

    def __eq__(self, other):
        return (other is self) or (other is None)

    def __ge__(self, other):
        return (other is self) or (other is None)

    def __gt__(self, other):
        return False


# Object that can be used for sorting in Python 3 in place of :obj:`None`
OrderableNone = OrderableNoneType()


def geometric_iterator(min, max, factor):
    """ Create a geometic sequence

    Generate the sequence `min`, `min`*`factor`, `min`*`factor`**2, ..., stopping at the first
    element greater then or equal to `max`.

    Args:
        min (:obj:`float`): first and smallest element of the geometic sequence
        max (:obj:`float`): largest element of the geometic sequence
        factor (:obj:`float`): multiplicative factor between sequence entries

    Returns:
        :obj:`iterator` of :obj:`float`: the geometic sequence

    Raises:
        :obj:`ValueError`: if `min` <= 0, or
            if `max` < `min`, or
            if `factor` <= 1
    """
    if not 0 < min:
        raise ValueError(f'min = {min}; 0 < min is required')
    if max < min:
        raise ValueError(f'min = {min} and max = {max}; min <= max is required')
    if factor <= 1:
        raise ValueError(f'factor = {factor}; 1 < factor is required')
    sequence_value = min
    while sequence_value < max or math.isclose(sequence_value, max, rel_tol=1E-14):
        yield sequence_value
        sequence_value *= factor


class DFSMAcceptor(object):
    """ Deterministic finite state machine (DFSM) that accepts sequences which move from the start to the end state

    A data-driven finite state machine (finite-state automaton). States and messages can be any
    hashable type.

    Attributes:
        start_state (:obj:`object`): a DFSM's start state
        accepting_state (:obj:`object`): a DFSM must be in this state to accept a message sequence
        transitions_dict (:obj:`dict`): transitions, a map state -> message -> next state
        state (:obj:`object`): a DFSM's current state
    """

    # acceptance fails
    FAIL = 'fail'
    # acceptance succeeds
    ACCEPT = 'accept'

    def __init__(self, start_state, accepting_state, transitions):
        """
        Args:
            start_state (:obj:`object`): a DFSM's start state
            accepting_state (:obj:`object`): a DFSM must be in this state to accept a message sequence
            transitions (:obj:`iterator` of `tuple`): transitions, an iterator of
                (state, message, next state) tuples

        Raises:
            :obj:`ValueError`: if `transitions` contains redundant transitions, or if no transitions
                out of `start_state` are provided
        """
        self.start_state = start_state
        self.accepting_state = accepting_state
        self.transitions_dict = {}
        for state, transition_message, new_state in transitions:
            if state not in self.transitions_dict:
                self.transitions_dict[state] = {}
            if transition_message in self.transitions_dict[state]:
                raise ValueError("'{}' already a transition from '{}'".format(transition_message,
                    state))
            self.transitions_dict[state][transition_message] = new_state
        if start_state not in self.transitions_dict:
            raise ValueError("no transitions available from start state '{}'".format(start_state))
        self.reset()

    def reset(self):
        """ Reset a DFSM to it's start state
        """
        self.state = self.start_state

    def get_state(self):
        """ Get a DFSM's state
        """
        return self.state

    def exec_transition(self, message):
        """ Execute one DFSM state transition

        Args:
            message (:obj:`object`): a message that might transition the DFSM to another state

        Returns:
            :obj:`object`: returns `DFSMAcceptor.FAIL` if `message` does not transition the DFSM to
                another state; otherwise returns `None`
        """
        if message not in self.transitions_dict[self.state]:
            return DFSMAcceptor.FAIL
        self.state = self.transitions_dict[self.state][message]

    def run(self, transition_messages):
        """ Execute one DFSM state transition

        Args:
            transition_messages (:obj:`iterator` of `object`): an iterator that provides messages that
                might transition a DFSM from its `start_state` to its `accepting_state`

        Returns:
            :obj:`object`: returns `DFSMAcceptor.FAIL` if `transition_messages` do not transition the
                DFSM to from its `start_state` to its `accepting_state`; otherwise returns `DFSMAcceptor.ACCEPT`
        """
        self.reset()
        for transition_message in transition_messages:
            rv = self.exec_transition(transition_message)
            if rv == DFSMAcceptor.FAIL:
                return DFSMAcceptor.FAIL
        if self.state == self.accepting_state:
            return DFSMAcceptor.ACCEPT
        return DFSMAcceptor.FAIL


class EnhancedDataClass(object):
    """ A class that enhances dataclasses

    Attributes:
        LIKELY_INITIAL_VOWEL_SOUNDS (:obj:`set` of :obj:`str`): initial letters of words that will
            be preceeded by 'an'
        DO_NOT_PICKLE (:obj:`set` of :obj:`str`): fields in a dataclass that cannot be pickled
    """

    LIKELY_INITIAL_VOWEL_SOUNDS = {'a', 'e', 'i', 'o', 'u'}
    DO_NOT_PICKLE = set()

    def validate_dataclass_type(self, attr_name):
        """ Validate the type of an attribute in a dataclass instance

        Args:
            attr_name (:obj:`str`): the name of the attribute to validate

        Returns:
            :obj:`None`: if no error is found

        Raises:
            :obj:`ValueError`: if `attr_name` is not the name of a field
            :obj:`TypeError`: if attribute `attr_name` does not have the right type
        """
        fields_map = {field.name: field for field in dataclasses.fields(self)}
        if attr_name not in fields_map:
            raise ValueError(f"'{attr_name}' must be a field in {self.__class__.__name__}")

        # validate type
        field = fields_map[attr_name]
        attr = getattr(self, field.name)

        # place the right article before a type name, approximately
        single_article = 'a'
        if field.type.__name__[0].lower() in self.LIKELY_INITIAL_VOWEL_SOUNDS:
            single_article = 'an'

        # accept int inputs to float fields
        if isinstance(attr, int) and field.type is float:
            attr = float(attr)
            setattr(self, field.name, attr)

        # dataclasses._MISSING_TYPE is the value used for default if no default is provided
        if 'dataclasses._MISSING_TYPE' in str(field.default):
            if not isinstance(attr, field.type):
                raise TypeError(f"{field.name} ('{attr}') must be {single_article} {field.type.__name__}")
        else:
            if (field.default is None and attr is not None) or field.default is not None:
                if not isinstance(attr, field.type):
                    raise TypeError(f"{field.name} ('{attr}') must be {single_article} {field.type.__name__}")

    def validate_dataclass_types(self):
        """ Validate the types of all attributes in a dataclass instance

        Returns:
            :obj:`None`: if no error is found

        Raises:
            :obj:`error_type`: if an attribute does not have the right type
        """
        # validate types
        for field in dataclasses.fields(self):
            self.validate_dataclass_type(field.name)

    def __setattr__(self, name, value):
        """ Validate a dataclass attribute when it is changed """
        object.__setattr__(self, name, value)
        self.validate_dataclass_type(name)

    def prepare_to_pickle(self):
        """ Provide a copy of this instance that can be pickled; recursively calls nested :obj:`EnhancedDataClass`\ s

        Some objects, such as functions, cannot be pickled. Replace the value of these attributes with :obj:`None`.

        Returns:
            :obj:`SimulationConfig`: a copy of `self` that can be pickled
        """
        to_pickle = copy.deepcopy(self)
        for field in dataclasses.fields(self):
            attr = getattr(self, field.name)
            if field.name in self.DO_NOT_PICKLE:
                setattr(to_pickle, field.name, None)
            elif isinstance(attr, EnhancedDataClass):
                setattr(to_pickle, field.name, attr.prepare_to_pickle())
        return to_pickle

    @classmethod
    def write_dataclass(cls, dataclass, dirname):
        """ Save an `EnhancedDataClass` object to the directory `dirname`

        Args:
            dataclass (:obj:`EnhancedDataClass`): an `EnhancedDataClass` instance
            dirname (:obj:`str`): directory for holding the dataclass

        Raises:
            :obj:`ValueError`: if a dataclass has already been written to `dirname`
        """
        pathname = cls.get_pathname(dirname)
        if os.path.isfile(pathname):
            raise ValueError(f"'{pathname}' already exists")

        with open(pathname, 'wb') as file:
            pickle.dump(dataclass.prepare_to_pickle(), file)

    @classmethod
    def read_dataclass(cls, dirname):
        """ Read an `EnhancedDataClass` object from the directory `dirname`

        Args:
            dirname (:obj:`str`): directory for holding the dataclass

        Returns:
            :obj:`EnhancedDataClass`: an `EnhancedDataClass` object
        """
        pathname = cls.get_pathname(dirname)

        # load and return this EnhancedDataClass
        with open(pathname, 'rb') as file:
            return pickle.load(file)

    @staticmethod
    def get_pathname(dirname):
        """ Get the pathname for a pickled :obj:`EnhancedDataClass` object stored in directory `dirname`

        Subclasses of :obj:`EnhancedDataClass` that read or write files must override this method.

        Args:
            dirname (:obj:`str`): directory for holding the dataclass

        Returns:
            :obj:`str`: pathname for the :obj:`EnhancedDataClass`
        """
        raise ValueError(f"subclasses of EnhancedDataClass that read or write files must define get_pathname method")
        return os.path.join(dirname, 'filename goes here')  # pragma: no cover

    def semantically_equal(self, other):
        """ Evaluate whether two instances of an :obj:`EnhancedDataClass` subclass are semantically equal

        Defaults to `self == other` if not overridden. Otherwise, should return :obj:`True` if
        `self` and `other` are semantically equal, and :obj:`False` otherwise.
        By default, `dataclasses` are created with `__eq__` methods that compare all attributes.

        Args:
            other (:obj:`Object`): other object

        Returns:
            :obj:`bool`: :obj:`True` if `other` is semantically equal to `self`, :obj:`False` otherwise
        """
        return self == other