julienmalard/Tinamit

View on GitHub
tinamit0/envolt/mds/pysd/_envolt.py

Summary

Maintainability
A
3 hrs
Test Coverage
from ast import literal_eval

import numpy as np
import pandas as pd
from tinamit.calibs.sintx.ec import Ecuación
from tinamit.envolt.mds.pysd._funcs import decodar

from ._funcs import gen_mod_pysd, obt_paso_mod_pysd
from ._vars import VarPySDAuxiliar, VarPySDNivel, VarPySDConstante, VariablesPySD
from .._envolt import ModeloDS


class ModeloPySD(ModeloDS):
    """
    Envoltura para modelos PySD.
    """
    ext = ['.mdl', '.xmile', '.xml', '.py']

    def __init__(símismo, archivo, nombre='mds'):
        símismo.mod = gen_mod_pysd(archivo)
        símismo.vars_para_cambiar = {}
        símismo.archivo = archivo

        símismo.cont_simul = False
        símismo.paso_act = 0

        super().__init__(variables=_gen_vars(símismo.mod), nombre=nombre)

    def iniciar_modelo(símismo, corrida):
        # Poner los variables y el tiempo a sus valores iniciales
        símismo.mod.reload()
        símismo.paso_act = 0

        símismo.cont_simul = False

        super().iniciar_modelo(corrida)

    def incrementar(símismo, rebanada):

        símismo.paso_act += rebanada.n_pasos
        devolv = [símismo.paso_act - 1, símismo.paso_act]
        res = símismo.mod.run(
            initial_condition='current' if símismo.cont_simul else 'original', params=símismo.vars_para_cambiar,
            return_timestamps=devolv, return_columns=[v.nombre_py for v in rebanada.resultados.variables()]
        )

        # Guardar valores iniciales
        for v in rebanada.resultados.variables():
            rebanada.resultados[v].vals[símismo.paso_act - 1] = res[v.nombre_py][símismo.paso_act - 1]

        símismo.cont_simul = True
        res_interés = {v: res[v.nombre_py].values[-1] for v in rebanada.resultados.variables()}
        símismo.variables.cambiar_vals(
            {ll: v.item() if isinstance(v, np.ndarray) and v.shape == () else v for ll, v in res_interés.items()}
        )

        símismo.vars_para_cambiar.clear()
        super().incrementar(rebanada)

    def _correr_hasta_final(símismo):
        t = símismo.corrida.t

        t_inic_mod = símismo.mod.time._t
        eje_pysd = np.arange(
            t_inic_mod, t_inic_mod + t.n_pasos * t.tmñ_paso + 1, t.guardar_cada * t.tmñ_paso
        )

        if símismo.corrida.extern is not None:
            vals_extern = símismo.corrida.extern.obt_vals(t.eje(), var=símismo.variables)
            paráms = {
                vr: vl.squeeze().to_pandas()
                for vr, vl in vals_extern.items()
            }
        else:
            paráms = {}

        if símismo.corrida.clima is not None:
            vars_clima = símismo.corrida.clima.obt_todos_vals(t.eje(), vars_clima=símismo.vars_clima)
            paráms.update(vars_clima)
        for ll, v in paráms.items():
            if isinstance(v, pd.Series):
                v.index = eje_pysd[t.eje().isin(v.index.values)]

        símismo._proc_paráms(paráms)
        iniciales = {}
        niveles = símismo.variables.niveles()
        for vr in list(paráms):
            for nv in niveles:
                if vr in nv.parientes:
                    iniciales[str(nv)] = paráms[vr][0]
                    break

        res_pysd = símismo.mod.run(
            params=paráms,
            initial_condition='o',
            return_timestamps=eje_pysd
        )
        return {vr: res_pysd[str(vr)].values for vr in símismo.corrida.resultados.variables()}

    def unidad_tiempo(símismo):
        return obt_paso_mod_pysd(símismo.mod)

    def cambiar_vals(símismo, valores):
        símismo.vars_para_cambiar.update({vr: float(vl) for vr, vl in valores.items()})
        super().cambiar_vals(valores)

    def paralelizable(símismo):
        return True

    def cerrar(símismo):
        super().cerrar()
        símismo.vars_para_cambiar.clear()

    def _proc_paráms(símismo, prms):
        for p in list(prms):
            v = símismo.variables[p]
            if isinstance(v, VarPySDNivel):
                val = prms.pop(p)
                if v.var_inic:
                    prms[v.var_inic.nombre] = val


def _gen_vars(mod):
    l_vars = []

    internos = ['FINAL TIME', 'TIME STEP', 'SAVEPER', 'INITIAL TIME']
    for i, f in mod.doc().iterrows():

        nombre = f['Real Name']
        if f['Type'] == 'lookup' or nombre in internos:
            continue

        nombre_py = f['Py Name']
        unid = decodar(f['Unit'])
        líms = literal_eval(f['Lims'])
        ec = decodar(f['Eqn'])
        info = decodar(f['Comment'])
        obj_ec = Ecuación(ec, dialecto='vensim')
        parientes = {v for v in obj_ec.variables() if v not in internos}
        inic = getattr(mod.components, nombre_py)()

        try:
            getattr(mod.components, '_integ_' + nombre_py)
            nivel = True

        except AttributeError:
            nivel = False
        if nivel:
            var = VarPySDNivel(
                nombre, nombre_py=nombre_py, unid=unid, ec=ec, parientes=parientes, inic=inic, líms=líms, info=info
            )
        elif len(parientes):
            var = VarPySDAuxiliar(
                nombre, nombre_py=nombre_py, unid=unid, ec=ec, parientes=parientes, inic=inic, líms=líms, info=info
            )
        else:
            var = VarPySDConstante(
                nombre, nombre_py=nombre_py, unid=unid, ec=ec, parientes=parientes, inic=inic, líms=líms, info=info
            )

        l_vars.append(var)

    for vr in l_vars:
        if isinstance(vr, VarPySDNivel):
            # para hacer: no funciona con XMILE, solamente con vensim (.mdl)
            args_integ = Ecuación(vr.ec).sacar_args_func('INTEG', 2)
            if args_integ:
                arg_inic = args_integ[1]
                vr.estab_var_inic(next((
                    v for v in l_vars if v.nombre == arg_inic
                ), None))

    return VariablesPySD(l_vars)