julienmalard/Tinamit

View on GitHub
src/tinamit/simul.py

Summary

Maintainability
A
0 mins
Test Coverage
from collections import Counter
from typing import Iterable, List, Dict, Optional, Union

import numpy as np
import pandas as pd
import trio
import xarray as xr

from .hilo import Hilo
from .rebanada import Rebanada
from .resultados import Resultados
from .tiempo import conv_tiempo, UnidTiempo
from .utils import EJE_TIEMPO


class Simulación(object):
    def __init__(símismo, hilos: Iterable[Hilo], unid_tiempo: UnidTiempo, resultados: Dict[str, Resultados]):
        dups = [nmbr for nmbr, i in Counter([str(h) for h in hilos]).items() if i > 1]
        if dups:
            raise ValueError("Nombres de modelos duplicados:\n\t{}".format("\n\t".join(dups)))

        símismo.hilos: Dict[str, Hilo] = {str(h): h for h in hilos}
        símismo.resultados = resultados
        símismo.unid_tiempo = unid_tiempo

    def simular(símismo) -> Dict[str, Resultados]:
        return trio.run(símismo.simular_asinc)

    def iniciar(símismo):
        trio.run(símismo.iniciar_asinc)

    def incr(símismo, n_pasos):
        trio.run(símismo.incr_asinc, n_pasos)

    def correr(símismo):
        trio.run(símismo.correr_asinc)

    def cerrar(símismo):
        trio.run(símismo.cerrar_asinc)

    async def simular_asinc(símismo) -> Dict[str, Resultados]:
        await símismo.iniciar_asinc()
        await símismo.correr_asinc()
        await símismo.cerrar_asinc()

        return símismo.resultados

    async def iniciar_asinc(símismo):
        async with trio.open_nursery() as grupo:
            for h in símismo.hilos.values():
                rebanada = Rebanada(
                    n_pasos=0,
                    tiempo=h.tiempo,
                    resultados=símismo.resultados[str(h)],
                    externos=símismo._gen_externos(h, 0)
                )
                grupo.start_soon(h.iniciar, rebanada)

    async def incr_asinc(símismo, por: Union[int, pd.Timestamp], hilos: Optional[Dict[str, Hilo]] = None):
        hilos = hilos or símismo.hilos

        async with trio.open_nursery() as grupo:
            for h in hilos.values():
                if isinstance(por, int):
                    n_pasos_h = conv_tiempo(por, de=símismo.unid_tiempo, a=h.tiempo.unids, t=h.tiempo.ahora)
                else:
                    n_pasos_h = h.tiempo.eje.get_loc(por) - h.tiempo.paso

                rebanada = Rebanada(
                    n_pasos=n_pasos_h,
                    tiempo=h.tiempo,
                    resultados=símismo.resultados[str(h)],
                    externos=símismo._gen_externos(h, n_pasos_h)
                )
                grupo.start_soon(h.incr, rebanada)

    async def correr_asinc(símismo):
        while q := símismo.quedan:
            próximo_t = {str(h): h.próximo_tiempo() for h in q}
            f_máxima = max(próximo_t.values())
            próximos = {str(h): h for h in q if próximo_t[str(h)] == f_máxima and próximo_t[str(h)] != h.tiempo.ahora}

            if not próximos:
                raise RuntimeError('Iteración infinita entre {}'.format(', '.join([str(h) for h in q])))

            await símismo.incr_asinc(por=f_máxima, hilos=próximos)

    async def cerrar_asinc(símismo):
        async with trio.open_nursery() as grupo:
            for h in símismo.hilos.values():
                grupo.start_soon(h.cerrar)

    @property
    def quedan(símismo) -> List[Hilo]:
        return [h for h in símismo.hilos.values() if not h.tiempo.terminado]

    def _gen_externos(símismo, hilo: Hilo, n_pasos: int) -> xr.Dataset:
        tiempo = hilo.tiempo

        externos: xr.Dataset = xr.Dataset(
            {
                str(req.var_recep): xr.DataArray(
                    np.nan,
                    coords={EJE_TIEMPO: tiempo.eje[tiempo.paso: tiempo.paso + n_pasos], **req.var_recep.coords},
                    dims=[EJE_TIEMPO, *req.var_recep.dims]
                ) for req in hilo.requísitos
            }
        )
        for req in hilo.requísitos:
            res = símismo.resultados[str(req.hilo_fuente)].valores[str(req.var_fuente)]
            externo = req.recortar(tiempo, n_pasos=n_pasos, res=res)
            externos = externos.fillna(externo)

        externos = externos.ffill(EJE_TIEMPO)
        return externos