BondGraphTools/BondGraphTools

View on GitHub
BondGraphTools/actions.py

Summary

Maintainability
A
3 hrs
Test Coverage
B
86%
"""This module provides functions for the actions one wishes to perform on
models bond graph models such as creating new models and components,
connecting parts together, and setting parameters.
"""

import copy
import logging

from BondGraphTools.component_manager import get_component, base_id
from BondGraphTools.exceptions import (
    InvalidPortException, InvalidComponentException
)
from BondGraphTools.base import BondGraphBase, Bond, Port
from BondGraphTools.port_managers import PortTemplate

from .atomic import EqualFlow


logger = logging.getLogger(__name__)

__all__ = [
    "new",
    "add",
    "swap",
    "connect",
    "disconnect",
    "expose",
    "remove",
    "set_param"
]


def _get_active_bonds(item):
    if isinstance(item, BondGraphBase):
        model = item.parent
        assert model, f"{item} is not part of a model."
        return {bond for bond in model.bonds
                if bond.tail.component is item
                or bond.head.component is item}

    if isinstance(item, tuple) or isinstance(item, Port):
        component, port = item
        return {bond for bond in component.parent.bonds
                if item is bond.tail or item is bond.head}

    raise InvalidPortException(f"Cannot find {item}: Expected a component"
                               "or port.")


def disconnect(target, other):
    """
    Disconnects the flow of energy between the two components or ports.
    If there is no connection, this method does nothing.

    Args:
        target (Port, BondGraphBase):
        other (Port, BondGraphBase):

    Raises:
        InvalidComponentException

    See Also:
        :func:`connect`
    """
    target_bonds = _get_active_bonds(target)
    other_bonds = _get_active_bonds(other)

    for bond in target_bonds.intersection(other_bonds):
        model = bond.tail.component.parent
        model.bonds.remove(bond)


def connect(source, destination):
    """Connects two components or ports.

    Defines a power bond between the source and destination ports such that
    the bond tail is at the source, and the bond head is at the destination.
    We assume that either the source and/or destination is or has a free
    port.

    Args:
        source (Port or BondGraphBase):      The tail of the power bond
        destination (Port or BondGraphBase): The head of the power bond

    Raises:
        InvalidPortException, InvalidComponentException

    See Also:
        :func:`disconnect`
    """

    tail = _find_or_make_port(source, is_tail=True)
    head = _find_or_make_port(destination)

    model = tail.component.parent
    model_prime = head.component.parent
    if not model:
        raise InvalidComponentException(f"{tail.component} is not in a model")
    elif not model_prime:
        raise InvalidComponentException(f"{head.component} is not in a model")
    elif model is not model_prime:
        raise InvalidComponentException("Components are in different models")

    bond = Bond(tail, head)
    model.bonds.add(bond)


def _unpack_port_arg(port):
    # returns (component, idx)
    if isinstance(port, (Port, tuple)):
        c, idx = port
        return c, idx

    if isinstance(port, BondGraphBase):
        return port, None

    if isinstance(port, PortTemplate):
        assert port.parent
        return port.parent, port

    raise InvalidPortException("Could not unpack port %s", port)


def _find_or_make_port(arg, is_tail=False):
    component, port = _unpack_port_arg(arg)

    if isinstance(component, EqualFlow) and port is None:
        if is_tail:
            port = component.inverting
        else:
            port = component.non_inverting
    try:
        p = component.get_port(port)
        if p is None:
            raise InvalidPortException
        return p
    except InvalidPortException as ex:
        try:
            return component.new_port(port)
        except AttributeError:
            raise ex


def swap(old_component, new_component):
    """
    Replaces the old component with a new component.
    Components must be of compatible classes; 1 one port cannot replace an
    n-port, for example.
    The old component will be completely removed from the system model.

    Args:
        old_component: The component to be replaced. Must already be in the
         model.
        new_component: The substitute component which must not be in the
         model

    Raises:
        InvalidPortException, InvalidComponentException

    """

    # TODO: More validation required

    def is_swap_valid(old_comp, new_comp): # noqa
        if not isinstance(new_comp, BondGraphBase):
            return False
        return True

    model = old_component.parent
    if not model:
        raise InvalidComponentException("No parent model.")
    elif new_component in model.components:
        raise InvalidComponentException(
            "Component is already in the model"
        )
    elif not is_swap_valid(old_component, new_component):
        raise InvalidComponentException("Cannot swap components")

    model.add(new_component)

    swaps = []
    for bond in model.bonds:
        try:
            if bond.tail.component is old_component:
                tail = new_component.get_port()
                head = bond.head
            elif bond.head.component is old_component:
                tail = bond.tail
                head = new_component.get_port()
            else:
                continue
        except InvalidPortException:
            raise InvalidComponentException(
                "Cannot swap components: Incompatible ports"
            )

        swaps.append((bond, Bond(tail, head)))

    for old_bond, new_bond in swaps:
        disconnect(old_bond.tail, old_bond.head)
        connect(new_bond.tail, new_bond.head)

    model.remove(old_component)


def new(component=None, name=None, library=base_id, value=None, **kwargs):
    """
    Creates a new Bond Graph from a library component.

    Args:
        component(str or obj): The type of component to create.
         If a string is specified, the the component will be created from the
         appropriate libaray. If an existing bond graph is given, the bond
         graph will be cloned.
        name (str): The name for the new component
        library (str): The library from which to find this component (if
        component is specified by string).
        value:

    Returns: instance of :obj:`BondGraph`

    Raises: NotImplementedError
    """

    if not component:
        cls = _find_subclass("BondGraph", BondGraphBase)
        return cls(name=name)
    elif isinstance(component, str):
        build_args = get_component(component, library)
        args = ()
        if name:
            build_args.update({"name": name})
        if value or isinstance(value, (int, float, complex)):
            args, build_args = _update_build_params(args,
                                                    build_args,
                                                    value,
                                                    **kwargs)
        cls = _find_subclass(
            build_args["class"], BondGraphBase
        )
        del build_args["class"]

        comp = cls(*args, **build_args)
        comp.__component__ = component
        comp.__library__ = library
        return comp

    elif isinstance(component, BondGraphBase):
        obj = copy.copy(component)
        if name:
            obj.name = name
        if value:
            _update_build_params(obj.__dict__, value)

        return obj

    else:
        raise NotImplementedError(
            "New not implemented for object {}", component
        )


def _update_build_params(args, build_args, value, **kwargs):

    if isinstance(value, (list, tuple)):
        assignments = zip(build_args["params"].keys(), value)

        for param, v in assignments:
            try:
                build_args["params"][param]["value"] = v
            except TypeError:
                build_args["params"][param] = v

    elif isinstance(value, dict):
        for param, v in value.items():
            try:
                if isinstance(build_args["params"][param], dict):
                    build_args["params"][param]["value"] = v
                else:
                    build_args["params"][param] = v
            except KeyError:
                build_args[param] = v

    elif isinstance(value, str):
        args = (*args, value)
    else:
        # Todo: fix me! Dirty Hacks to make PH load

        p = next(iter(build_args["params"]))
        build_args["params"][p] = value

    return args, build_args


def _find_subclass(name, base_class):

    for c in base_class.__subclasses__():
        if c.__name__ == name:
            return c
        else:
            sc = _find_subclass(name, c)
            if sc:
                return sc


def expose(component, label=None):
    """
    Exposes the component as port on the parent.

    If the target component is not a SS component, it is replaced with a new
    SS component.
    A new external port is added to the parent model, and connected to the
    SS component.

    Args:
        component: The component to expose.
        label: The label to assign to the external port

    Raises: InvalidComponentException
    """
    model = component.parent
    if not model:
        raise InvalidComponentException(
            f"Component {component} is not inside anything"
        )
    # fix me with metamodeles or something trickier
    if component.__component__ != "SS":
        ss = new("SS", name=component.name)
        try:
            swap(component, ss)
        except InvalidComponentException as ex:
            raise InvalidComponentException(f"Cannot expose {component}", ex)
    else:
        ss = component

    effort_port = (ss, 'e')
    flow_port = (ss, 'f')

    if not label:
        label = str(len(model.ports))

    model.map_port(label, (effort_port, flow_port))


def add(model, *args):
    """
    Add the specified component(s) to the model
    """
    model.add(*args)


def remove(model, component):
    """Removes the specified components from the Bond Graph model.
    """
    model.remove(component)


def set_param(component, param, value):
    """
    Sets the specified parameter to a particular value.

    Args:
        component (`BondGraphBase`): The particular component.
        param: The parameter to set
        value: The value to assign it to, may be None
    """
    component.set_param(param, value)