BondGraphTools/actions.py
"""This module provides functions for the actions one wishes to perform on
models bond graph models such as creating new models and components,
connecting parts together, and setting parameters.
"""
import copy
import logging
from BondGraphTools.component_manager import get_component, base_id
from BondGraphTools.exceptions import (
InvalidPortException, InvalidComponentException
)
from BondGraphTools.base import BondGraphBase, Bond, Port
from BondGraphTools.port_managers import PortTemplate
from .atomic import EqualFlow
logger = logging.getLogger(__name__)
__all__ = [
"new",
"add",
"swap",
"connect",
"disconnect",
"expose",
"remove",
"set_param"
]
def _get_active_bonds(item):
if isinstance(item, BondGraphBase):
model = item.parent
assert model, f"{item} is not part of a model."
return {bond for bond in model.bonds
if bond.tail.component is item
or bond.head.component is item}
if isinstance(item, tuple) or isinstance(item, Port):
component, port = item
return {bond for bond in component.parent.bonds
if item is bond.tail or item is bond.head}
raise InvalidPortException(f"Cannot find {item}: Expected a component"
"or port.")
def disconnect(target, other):
"""
Disconnects the flow of energy between the two components or ports.
If there is no connection, this method does nothing.
Args:
target (Port, BondGraphBase):
other (Port, BondGraphBase):
Raises:
InvalidComponentException
See Also:
:func:`connect`
"""
target_bonds = _get_active_bonds(target)
other_bonds = _get_active_bonds(other)
for bond in target_bonds.intersection(other_bonds):
model = bond.tail.component.parent
model.bonds.remove(bond)
def connect(source, destination):
"""Connects two components or ports.
Defines a power bond between the source and destination ports such that
the bond tail is at the source, and the bond head is at the destination.
We assume that either the source and/or destination is or has a free
port.
Args:
source (Port or BondGraphBase): The tail of the power bond
destination (Port or BondGraphBase): The head of the power bond
Raises:
InvalidPortException, InvalidComponentException
See Also:
:func:`disconnect`
"""
tail = _find_or_make_port(source, is_tail=True)
head = _find_or_make_port(destination)
model = tail.component.parent
model_prime = head.component.parent
if not model:
raise InvalidComponentException(f"{tail.component} is not in a model")
elif not model_prime:
raise InvalidComponentException(f"{head.component} is not in a model")
elif model is not model_prime:
raise InvalidComponentException("Components are in different models")
bond = Bond(tail, head)
model.bonds.add(bond)
def _unpack_port_arg(port):
# returns (component, idx)
if isinstance(port, (Port, tuple)):
c, idx = port
return c, idx
if isinstance(port, BondGraphBase):
return port, None
if isinstance(port, PortTemplate):
assert port.parent
return port.parent, port
raise InvalidPortException("Could not unpack port %s", port)
def _find_or_make_port(arg, is_tail=False):
component, port = _unpack_port_arg(arg)
if isinstance(component, EqualFlow) and port is None:
if is_tail:
port = component.inverting
else:
port = component.non_inverting
try:
p = component.get_port(port)
if p is None:
raise InvalidPortException
return p
except InvalidPortException as ex:
try:
return component.new_port(port)
except AttributeError:
raise ex
def swap(old_component, new_component):
"""
Replaces the old component with a new component.
Components must be of compatible classes; 1 one port cannot replace an
n-port, for example.
The old component will be completely removed from the system model.
Args:
old_component: The component to be replaced. Must already be in the
model.
new_component: The substitute component which must not be in the
model
Raises:
InvalidPortException, InvalidComponentException
"""
# TODO: More validation required
def is_swap_valid(old_comp, new_comp): # noqa
if not isinstance(new_comp, BondGraphBase):
return False
return True
model = old_component.parent
if not model:
raise InvalidComponentException("No parent model.")
elif new_component in model.components:
raise InvalidComponentException(
"Component is already in the model"
)
elif not is_swap_valid(old_component, new_component):
raise InvalidComponentException("Cannot swap components")
model.add(new_component)
swaps = []
for bond in model.bonds:
try:
if bond.tail.component is old_component:
tail = new_component.get_port()
head = bond.head
elif bond.head.component is old_component:
tail = bond.tail
head = new_component.get_port()
else:
continue
except InvalidPortException:
raise InvalidComponentException(
"Cannot swap components: Incompatible ports"
)
swaps.append((bond, Bond(tail, head)))
for old_bond, new_bond in swaps:
disconnect(old_bond.tail, old_bond.head)
connect(new_bond.tail, new_bond.head)
model.remove(old_component)
def new(component=None, name=None, library=base_id, value=None, **kwargs):
"""
Creates a new Bond Graph from a library component.
Args:
component(str or obj): The type of component to create.
If a string is specified, the the component will be created from the
appropriate libaray. If an existing bond graph is given, the bond
graph will be cloned.
name (str): The name for the new component
library (str): The library from which to find this component (if
component is specified by string).
value:
Returns: instance of :obj:`BondGraph`
Raises: NotImplementedError
"""
if not component:
cls = _find_subclass("BondGraph", BondGraphBase)
return cls(name=name)
elif isinstance(component, str):
build_args = get_component(component, library)
args = ()
if name:
build_args.update({"name": name})
if value or isinstance(value, (int, float, complex)):
args, build_args = _update_build_params(args,
build_args,
value,
**kwargs)
cls = _find_subclass(
build_args["class"], BondGraphBase
)
del build_args["class"]
comp = cls(*args, **build_args)
comp.__component__ = component
comp.__library__ = library
return comp
elif isinstance(component, BondGraphBase):
obj = copy.copy(component)
if name:
obj.name = name
if value:
_update_build_params(obj.__dict__, value)
return obj
else:
raise NotImplementedError(
"New not implemented for object {}", component
)
def _update_build_params(args, build_args, value, **kwargs):
if isinstance(value, (list, tuple)):
assignments = zip(build_args["params"].keys(), value)
for param, v in assignments:
try:
build_args["params"][param]["value"] = v
except TypeError:
build_args["params"][param] = v
elif isinstance(value, dict):
for param, v in value.items():
try:
if isinstance(build_args["params"][param], dict):
build_args["params"][param]["value"] = v
else:
build_args["params"][param] = v
except KeyError:
build_args[param] = v
elif isinstance(value, str):
args = (*args, value)
else:
# Todo: fix me! Dirty Hacks to make PH load
p = next(iter(build_args["params"]))
build_args["params"][p] = value
return args, build_args
def _find_subclass(name, base_class):
for c in base_class.__subclasses__():
if c.__name__ == name:
return c
else:
sc = _find_subclass(name, c)
if sc:
return sc
def expose(component, label=None):
"""
Exposes the component as port on the parent.
If the target component is not a SS component, it is replaced with a new
SS component.
A new external port is added to the parent model, and connected to the
SS component.
Args:
component: The component to expose.
label: The label to assign to the external port
Raises: InvalidComponentException
"""
model = component.parent
if not model:
raise InvalidComponentException(
f"Component {component} is not inside anything"
)
# fix me with metamodeles or something trickier
if component.__component__ != "SS":
ss = new("SS", name=component.name)
try:
swap(component, ss)
except InvalidComponentException as ex:
raise InvalidComponentException(f"Cannot expose {component}", ex)
else:
ss = component
effort_port = (ss, 'e')
flow_port = (ss, 'f')
if not label:
label = str(len(model.ports))
model.map_port(label, (effort_port, flow_port))
def add(model, *args):
"""
Add the specified component(s) to the model
"""
model.add(*args)
def remove(model, component):
"""Removes the specified components from the Bond Graph model.
"""
model.remove(component)
def set_param(component, param, value):
"""
Sets the specified parameter to a particular value.
Args:
component (`BondGraphBase`): The particular component.
param: The parameter to set
value: The value to assign it to, may be None
"""
component.set_param(param, value)