BondGraphTools/BondGraphTools

View on GitHub
BondGraphTools/sim_tools.py

Summary

Maintainability
A
45 mins
Test Coverage
B
81%
"""Tools for running model simulations."""


import logging

import numpy as np
import sympy as sp
from scipy.optimize import broyden1
from .exceptions import ModelException

logger = logging.getLogger(__name__)


def _fetch_ic(x0, dx0, system, func, t0, eps=0.001):
    if isinstance(x0, list):
        assert len(x0) == len(system.state_vars)
        X0 = np.array(x0, dtype=np.float64)
    elif isinstance(x0, dict):
        X0 = np.array(
            [np.NaN for _ in system.state_vars], dtype=np.float64
        )
        for k, v in x0.items():
            _, idx = str(k).split('_')
            idx = int(idx)
            X0[idx] = v
    elif isinstance(x0, (int, float, complex)) and len(system.state_vars) == 1:
        X0 = np.array([x0], dtype=np.float64)
    elif isinstance(x0, np.ndarray) and x0.shape == (len(system.state_vars), ):
        X0 = x0
    else:
        raise ModelException(f"Invalid Initial Conditions: {x0}")

    if dx0 is not None:
        DX0 = np.array(dx0, dtype=np.float64)
    else:
        DX0 = np.zeros(X0.shape, dtype=np.float64)

    # if we don't have consistent initial conditions; find them if we can
    # fail if we can't

    def f(y):
        res = np.empty_like(X0)
        func(t0, X0, y, res)
        return res

    if np.linalg.norm(f(DX0)) > eps:

        DX0 = broyden1(f, DX0)
        if np.linalg.norm(f(DX0)) > 0.001:
            raise ModelException(
                f"Inconsistent initial conditions: "
                f"Could not find dx0 for the given x0 {x0}")

    return X0, DX0


def _simulate(system,
              timespan,
              x0,
              dx0=None,
              dt=0.1,
              control_vars=None):
    """Simulate the system dynamics.

    This method integrates the dynamics of the system over the specified
    interval of time, starting at the specified initial state.

    The solver used is a differential-algebraic integrator which respects
    conservation laws and algebraic constraints. It is expected that the
    initial state satisfies the systems inherent algebraic constrains;
    inconsistent initial conditions will raise exceptions.

    The initial values of derivatives can be specified and the solver will
    ensure they are consistent with the initial state, or change them if they
    are not.

    todo: detial control variables.

    Args:
        system :obj:`BondGraph`: The system to simulate
        timespan: A pair (`list` or `tuple`) containing the start and end
                  points of the simulation.
        x0: The initial conditions of the system.
        dx0 (Optional): The initial rates of change of the system. The default
                        value (`None`) indicates that the system should be
                        initialised from the state variable initial conditions.
        dt: The time step between reported (not integrated) values.
        control_vars: A `dict`, `list` or `tuple` specifing the values of the
                      control variables.
    Returns:
        t: numpy array of timesteps
        x: numpy array of state values

    Raises:
        ModelException, SolverException
    """

    if system.ports:
        raise ModelException(
            "Cannot Simulate %s: unconnected ports %s",
            system, system.ports)

    if system.control_vars and not control_vars:
        raise ModelException("Control variable not specified")

    samples = int((timespan[1] - timespan[0]) / dt) + 1
    t = np.linspace(*timespan, samples)

    res, X = _bondgraph_to_residuals(system, control_vars)

    X0, DX0 = _fetch_ic(x0, dx0, system, res, t[0])

    solver_name = 'ida'
    dae_solver = dae(solver_name, res)
    sol = dae_solver.solve(t, X0, DX0)

    return t.reshape((samples, 1)), np.transpose(sol.values.y).T


def _to_function(string, X, DX, substitutions):
    f = sp.sympify(string).subs(substitutions)

    f_n = sp.lambdify((sp.S('t'), X, DX), f, "numpy")
    return f_n


def _bondgraph_to_residuals(model, control_vars=None):
    dX = sp.IndexedBase('dX')
    X = sp.IndexedBase('X')
    U = sp.IndexedBase('U')
    x_subs = []
    dx_subs = []
    u_subs = []
    u_func = []
    n = len(model.state_vars)
    m = 0

    for i, x in enumerate(model.state_vars):
        x_subs.append((x, X[i]))
        dx_subs.append((sp.S(f'dx_{i}'), dX[i]))

    if len(model.control_vars) > 0:
        u_func_dict = {}
        u_constants = {}
        if isinstance(control_vars, list):
            u_func_dict.update({
                i: f for i, f in enumerate(control_vars)}
            )
        elif isinstance(control_vars, dict):
            u_func_dict.update({
                int(v[2:]): f for v, f in control_vars.items()
            })
        elif len(model.control_vars) == 1:
            u_func_dict[0] = control_vars
        else:
            raise TypeError(f"Control argument {control_vars} is invalid")

        test_x = np.zeros(shape=(n,), dtype=np.float32)
        for idx, f in u_func_dict.items():
            try:
                if isinstance(f, (float, int, sp.Number)):
                    u_constants[idx] = f
                    continue
                if isinstance(f, str):
                    f = _to_function(f, X, dX, dx_subs + x_subs)
                    u_func_dict[idx] = f
                if n == 1:
                    r = f(0, 0, 0)
                else:
                    r = f(0, test_x, test_x)
                assert isinstance(r, (float, int, sp.Number)
                                  ), "Invalid output from control"
            except Exception:
                message = f"Invalid control function for var: u_{idx}.\n " \
                    "Control functions should be of the form:\n" \
                    f"u_{idx} = f(t, x, dx/dt)"

                raise ModelException(message)

        for i, u in enumerate(model.control_vars):
            if i in u_constants:
                u_subs.append((u, u_constants[i]))
                continue
            u_subs.append((u, U[m]))
            try:
                u_func.append(u_func_dict[i])
            except KeyError:
                raise ModelException(f"Control variable {u} must be specified")
            m += 1
    rels = [r.subs(dx_subs).subs(x_subs).subs(u_subs)
            for r in model.constitutive_relations]

    if len(rels) != n:

        raise ModelException(
            "Model simplification error: system is under-determined")

    Fsym = sp.symarray('F', shape=n)
    for i, r in enumerate(rels):
        Fsym[i] = r

    t = sp.S('t')

    if not u_func:
        F = sp.lambdify((t, X, dX), Fsym)

        def residual(_t, _x, _dx, _res):
            _r = F(_t, _x, _dx)
            for i in range(n):
                _res[i] = _r[i]
    else:

        Fsym_u = sp.lambdify((t, X, dX, U), Fsym)

        def residual(_t, _x, _dx, _res):
            _u = [u_f(_t, _x, _dx) for u_f in u_func]
            _r = Fsym_u(_t, _x, _dx, _u)
            for i in range(n):
                _res[i] = _r[i]
    return residual, X


try:
    from scikits.odes.dae import dae
    simulate = _simulate
except ImportError:
    print("Warning - scikit.odes not found. Simulations are disabled.")

    from BondGraphTools.exceptions import SolverException

    def simulate(*args, **kwargs):
        raise SolverException(
            "library `scikits.odes` could not be imported "
            "but is required for simulations")
finally:
    simulate.__doc__ = _simulate.__doc__