julienmalard/Tikon

View on GitHub
tikon/central/matriz.py

Summary

Maintainability
D
2 days
Test Coverage
import functools
from typing import Any, Optional, Union, List

import numpy as np
import pandas as pd
import xarray as xr
from frozendict import frozendict


@functools.lru_cache
def _calc_índices(_dic, _coords, _dims):
    índices = []
    í_crds = {ll: list(range(len(v))) for ll, v in _coords.items()}
    for dm in _dims:
        if dm in _dic:
            v = _dic[dm]
            if isinstance(v, tuple):
                if v == tuple(range(_dic[dm][0], _dic[dm][-1] + 1)):
                    índices.append(slice(v[0], v[-1] + 1) if v != í_crds[dm] else slice(None))
                else:
                    índices.append(v)
            else:
                índices.append(v)
        else:
            índices.append(slice(None))
    return tuple(índices)


@functools.lru_cache
def _calc_índices_loc(coords, dic):
    return frozendict({
        dm: tuple(coords[dm].index(c) for c in (crds if isinstance(crds, (tuple, set, list)) else [crds]))
        for dm, crds in dic.items()
    })


@functools.lru_cache
def _proc_llave(dims, coords, llave):
    dims = tuple(dm for dm in dims if dm not in llave or not isinstance(llave[dm], int))
    coords = frozendict({
        dm: tuple(coords[dm][í] for í in llave[dm]) if dm in llave else coords[dm] for dm in dims
    })
    return dims, coords


class CoordsDatos(dict):
    def __init__(símismo, *args, **argsll):
        super().__init__(*args, **argsll)

    def __setitem__(símismo, llave, valor):
        raise ValueError(
            "No se pueden asignar coordenadas directamente en un objeto Datos. Llamar datos.asignar_coords en vez."
        )


class Loc(object):
    def __init__(símismo, datos):
        símismo.datos = datos

    def _índices(símismo, dic):
        if isinstance(dic, Datos):
            dic = dic.coords_internas
        return _calc_índices_loc(símismo.datos.coords_internas, dic)

    def __getitem__(símismo, itema):
        return símismo.datos[símismo._índices(itema)]

    def __setitem__(símismo, llave, valor):
        símismo.datos[símismo._índices(llave)] = valor


class Datos(object):
    def __init__(símismo, val, dims, coords, nombre=None, atribs=None, _verif=True, _conv_coords=None):

        if not isinstance(val, np.ndarray):
            if isinstance(val, (list, tuple)):
                val = np.array(val)
            else:
                val = np.full(shape=tuple(len(coords[dm]) for dm in dims), fill_value=val)

        símismo.matr = val
        símismo.dims = dims
        símismo._coords_internas = coords

        símismo._conv_coords = _conv_coords

        símismo.atribs = (atribs or {}).copy()
        símismo.nombre = nombre

        if _verif:
            símismo._verif_init()
        elif símismo._conv_coords is None:
            raise ValueError("Se debe especificar `_conv_coords` si `_verif_init===False`")

        símismo.loc = Loc(símismo)

    def _verif_init(símismo):
        símismo.dims = tuple(símismo.dims)
        símismo._conv_coords = {}
        símismo._coords_internas = símismo.codificar_coords(símismo._coords_internas)

        if set(símismo.dims) != set(símismo.coords_internas):
            raise ValueError(set(símismo.dims), set(símismo.coords_internas))
        frm = tuple(len(símismo.coords_internas[dm]) for dm in símismo.dims)
        if frm != símismo.matr.shape:
            raise ValueError(frm, símismo.matr.shape)

    @classmethod
    def de_xarray(cls, datos):
        coords = frozendict({
            ll: tuple(v.values if not np.issubdtype(v.values.dtype, np.datetime64) else pd.to_datetime(v.values))
            for ll, v in datos.coords.items()
        })
        return Datos(val=datos.values.copy(), dims=datos.dims, coords=coords, nombre=datos.name, atribs=datos.attrs)

    def a_xarray(símismo):
        return xr.DataArray(
            símismo.matr.copy(),
            coords=símismo.coords,
            dims=list(símismo.dims),
            name=símismo.nombre,
            attrs=símismo.atribs
        )

    def codificar_coords(símismo, coords: dict[str, Any]):
        return codificar_coords(coords, símismo)

    def _decodificar_coord(símismo, valor):
        def decodificar(v):
            return símismo._conv_coords[v] if v in símismo._conv_coords else v

        if isinstance(valor, (list, tuple)):
            return [decodificar(v) for v in valor]
        return decodificar(valor)

    def copiar(símismo):
        return Datos(símismo.matr.copy(), dims=símismo.dims, coords=símismo.coords_internas, nombre=símismo.nombre,
                     atribs=símismo.atribs, _conv_coords=símismo._conv_coords, _verif=False)

    def renombrar(símismo, cambios):
        copia = símismo.copiar()
        coords_final = dict(copia.coords_internas)
        for ant, nv in cambios.items():
            coords_final[nv] = coords_final.pop(ant)
        copia._coords_internas = frozendict(coords_final)
        copia.dims = tuple(cambios[dm] if dm in cambios else dm for dm in copia.dims)
        return copia

    def asignar_coords(símismo, eje, coords):
        símismo._coords_internas = frozendict({**símismo._coords_internas, **{
            eje: coords
        }})

    def llenar_nan(símismo, val):
        símismo.matr[np.isnan(símismo.matr)] = val
        return símismo

    def llenar_inf(símismo, val):
        símismo.matr[np.isinf(símismo.matr)] = val
        return símismo

    def nuevo_como(símismo, vals, excluir: Union[str, List[str]] = None):
        coords = símismo.coords_internas
        dims = símismo.dims
        if excluir:
            if isinstance(excluir, str):
                excluir = [excluir]

            coords = frozendict({ll: v for ll, v in coords.items() if ll not in excluir})
            dims = tuple(dm for dm in dims if dm not in excluir)

        return Datos(
            vals, dims=dims, coords=coords, nombre=símismo.nombre, atribs=símismo.atribs,
            _conv_coords=símismo._conv_coords, _verif=False
        )

    def transponer(símismo, dims):
        orden = [símismo.dims.index(d) for d in dims]
        return Datos(np.transpose(símismo.matr, orden), dims=dims, coords=símismo.coords_internas,
                     nombre=símismo.nombre,
                     atribs=símismo.atribs, _conv_coords=símismo._conv_coords, _verif=False)

    def expandir_dims(símismo, coords):
        if isinstance(coords, Datos):
            o_dims, o_coords = coords.dims, coords.coords_internas
        else:
            o_coords = coords
            o_dims = tuple(o_coords)
        return _expandir_dims(símismo, dims=o_dims, coords=o_coords)

    def dejar(símismo, dim):

        if len(símismo.coords_internas[dim]) != 1:
            raise ValueError('Dimensiones deben tener tamaño 1.')

        símismo.matr = símismo.matr.squeeze(símismo.dims.index(dim))
        símismo.dims = tuple(x for x in símismo.dims if x not in dim)
        símismo._coords_internas = frozendict({dm: símismo.coords_internas[dm] for dm in símismo.dims})
        símismo.loc = Loc(símismo)
        return símismo

    @property
    def coords_internas(símismo):
        return símismo._coords_internas

    @property
    def coords(símismo):
        return CoordsDatos({ll: símismo._decodificar_coord(v) for ll, v in símismo.coords_internas.items()})

    def _í_dims(símismo, dims):
        if isinstance(dims, str):
            return símismo.dims.index(dims)
        else:
            return tuple([símismo.dims.index(d) for d in dims])

    def _índices(símismo, dic):
        return _calc_índices(frozendict(dic), símismo.coords_internas, símismo.dims)

    def _proc_llave(símismo, llave):
        return _proc_llave(símismo.dims, símismo.coords_internas, frozendict(llave))

    def redond(símismo, n=None):
        return símismo.nuevo_como(np.round(símismo.matr, decimals=n or 0))

    def f_eje(símismo, f, dim, *args, **argsll):
        if dim is not None:
            vals = f(símismo.matr, axis=símismo._í_dims(dim), *args, **argsll)

            # Algunas funciones como `np.take_along_axis` no destruyen el eje especificado
            excluir = dim if vals.shape != símismo.matr.shape else None
            return símismo.nuevo_como(vals, excluir=excluir)
        return f(símismo.matr)

    def prod(símismo, dim=None):
        return símismo.f_eje(np.ndarray.prod, dim=dim)

    def suma(símismo, dim=None):
        return símismo.f_eje(np.ndarray.sum, dim=dim)

    def cualquier(símismo, dim=None):
        return símismo.f_eje(np.ndarray.any, dim=dim)

    def donde(símismo, cond, otro):
        m_cond = alinear_como(símismo, cond).matr if isinstance(cond, Datos) else cond
        m_otro = alinear_como(símismo, otro).matr if isinstance(otro, Datos) else otro
        return símismo.nuevo_como(np.where(m_cond, símismo.matr, m_otro))

    def f(símismo, f, *args, **argsll):
        copia = símismo.copiar()
        copia.fi(f, *args, **argsll)
        return copia

    def fi(símismo, f, *args, **argsll):
        símismo.matr[:] = f(símismo.matr, *args, **argsll)
        return símismo

    def __add__(símismo, otro):
        x, y = alinear_2(símismo, otro)
        return x.nuevo_como(x.matr + y)

    def __sub__(símismo, otro):
        x, y = alinear_2(símismo, otro)
        return x.nuevo_como(x.matr - y)

    def __mul__(símismo, otro):
        x, y = alinear_2(símismo, otro)
        return x.nuevo_como(x.matr * y)

    def __mod__(símismo, otro):
        x, y = alinear_2(símismo, otro)
        return x.nuevo_como(x.matr % y)

    def __truediv__(símismo, otro):
        x, y = alinear_2(símismo, otro)
        return x.nuevo_como(x.matr / y)

    def __floordiv__(símismo, otro):
        x, y = alinear_2(símismo, otro)
        return x.nuevo_como(x.matr // y)

    def __pow__(símismo, pot, módulo=None):
        x, y = alinear_2(símismo, pot)
        return x.nuevo_como(x.matr ** y)

    def __and__(símismo, otro):
        x, y = alinear_2(símismo, otro)
        return x.nuevo_como(x.matr & y)

    def __or__(símismo, otro):
        x, y = alinear_2(símismo, otro)
        return x.nuevo_como(x.matr | y)

    def __radd__(símismo, otro):
        return símismo + otro

    def __rsub__(símismo, otro):
        return -símismo + otro

    def __rmul__(símismo, otro):
        return símismo * otro

    def __rmod__(símismo, otro):
        otro = símismo.nuevo_como(otro)
        return otro % símismo

    def __rtruediv__(símismo, otro):
        return símismo.nuevo_como(1 / símismo.matr) * otro

    def __rfloordiv__(símismo, otro):
        return (otro / símismo).f(np.floor)

    def __rpow__(símismo, otro):
        otro = símismo.nuevo_como(otro)
        return otro % símismo

    def __rand__(símismo, otro):
        return símismo & otro

    def __ror__(símismo, otro):
        raise símismo | otro

    def __eq__(símismo, otro):
        x, y = alinear_2(símismo, otro)
        return x.nuevo_como(x.matr == y)

    def __gt__(símismo, otro):
        x, y = alinear_2(símismo, otro)
        return x.nuevo_como(x.matr > y)

    def __lt__(símismo, otro):
        x, y = alinear_2(símismo, otro)
        return x.nuevo_como(x.matr < y)

    def __ge__(símismo, otro):
        x, y = alinear_2(símismo, otro)
        return x.nuevo_como(x.matr >= y)

    def __le__(símismo, otro):
        x, y = alinear_2(símismo, otro)
        return x.nuevo_como(x.matr <= y)

    def __ne__(símismo, otro):
        x, y = alinear_2(símismo, otro)
        return x.nuevo_como(x.matr != y)

    def __iadd__(símismo, otro):
        if isinstance(otro, Datos):
            res = símismo + otro
            símismo.loc[res] = res
        else:
            símismo.matr += otro
        return símismo

    def __isub__(símismo, otro):
        if isinstance(otro, Datos):
            res = símismo - otro
            símismo.loc[res] = res
        else:
            símismo.matr -= otro
        return símismo

    def __imul__(símismo, otro):
        if isinstance(otro, Datos):
            res = símismo * otro
            símismo.loc[res] = res
        else:
            símismo.matr *= otro
        return símismo

    def __imod__(símismo, otro):
        if isinstance(otro, Datos):
            res = símismo % otro
            símismo.loc[res] = res
        else:
            símismo.matr %= otro
        return símismo

    def __itruediv__(símismo, otro):
        if isinstance(otro, Datos):
            res = símismo / otro
            símismo.loc[res] = res
        else:
            símismo.matr /= otro
        return símismo

    def __ifloordiv__(símismo, otro):
        if isinstance(otro, Datos):
            res = símismo // otro
            símismo.loc[res] = res
        else:
            símismo.matr //= otro
        return símismo

    def __ipow__(símismo, pot, módulo=None):
        if isinstance(pot, Datos):
            res = símismo ** pot
            símismo.loc[res] = res
        else:
            símismo.matr **= pot
        return símismo

    def __abs__(símismo):
        return símismo.nuevo_como(np.abs(símismo.matr))

    def __neg__(símismo):
        return símismo.nuevo_como(-símismo.matr)

    def __invert__(símismo):
        return símismo.nuevo_como(~símismo.matr)

    def __getitem__(símismo, itema):
        if isinstance(itema, (dict, frozendict)):
            dims, coords = símismo._proc_llave(itema)
            return Datos(símismo.matr[símismo._índices(itema)], dims=dims, coords=coords, nombre=símismo.nombre,
                         atribs=símismo.atribs, _conv_coords=símismo._conv_coords, _verif=False)
        raise TypeError(type(itema))

    def __setitem__(símismo, llave, valor):
        if isinstance(valor, Datos):
            if isinstance(llave, (dict, frozendict)):
                dims, coords = símismo._proc_llave(llave)
            else:
                dims, coords = símismo.dims, símismo.coords_internas

            valor = _alinear_como_coords(dims=dims, coords=coords, otro=valor).matr

        if isinstance(llave, frozendict):
            símismo.matr[símismo._índices(llave)] = valor
        else:
            símismo.matr[llave] = valor


def f_numpy(f, *datos, **argsll):
    dts = alinear(*datos)
    plntll = next((dt for dt in dts if isinstance(dt, Datos)), None)
    args = [dt.matr if isinstance(dt, Datos) else dt for dt in dts]
    if plntll:
        return plntll.nuevo_como(f(*args, **argsll))
    else:
        return f(*dts, **argsll)


def máximo(x, y):
    return f_numpy(np.maximum, x, y)


def mínimo(x, y):
    return f_numpy(np.minimum, x, y)


def donde(cond, x, y):
    return f_numpy(np.where, cond, x, y)


def lleno_como(otro, valor, tipod=None):
    return otro.nuevo_como(np.full(otro.matr.shape, valor, dtype=tipod))


def combinar(*matrs):
    dims = matrs[0].dims
    coords = {}

    for d in dims:
        coords[d] = []
        for m in matrs:
            coords[d] += [v for v in m.coords_internas[d] if v not in coords[d]]
        coords[d] = tuple(coords[d])

    _conv_coords = {c: v for m in matrs for c, v in m._conv_coords.items()}

    final = Datos(np.nan, dims=dims, coords=frozendict(coords), _conv_coords=_conv_coords, _verif=False)
    for m in matrs:
        final.loc[m.coords_internas] = m

    return final


def alinear(*datos):
    dts = _intersec_datos(*datos)
    return _redimensionar(*dts)


def alinear_2(dt1, dt2):
    if isinstance(dt2, Datos):
        dt1 = _expandir_dims(dt1, dims=dt2.dims, coords=dt2.coords_internas)
        dt2 = _expandir_dims(dt2, dims=dt1.dims, coords=dt1.coords_internas, guardar_orden=False)
        if dt1.coords_internas == dt2.coords_internas:
            return dt1, dt2.matr
        else:
            crds = _intersec_coords(dt1.coords_internas, dt2.coords_internas)
            return dt1.loc[crds], dt2.loc[crds].matr
    return dt1, dt2


def alinear_como(como, otro):
    return _alinear_como_coords(dims=como.dims, coords=como.coords_internas, otro=otro)


def _alinear_como_coords(dims, coords, otro):
    otro = _expandir_dims(otro, dims=dims, coords=coords, guardar_orden=False)
    if otro.coords_internas == coords:
        return otro
    else:
        return otro.loc[coords]


def _redimensionar(*args):
    datos = [x for x in args if isinstance(x, Datos)]

    dims = tuple(dict.fromkeys(dm for dt in datos for dm in dt.dims))
    coords = frozendict({dm: next(dt.coords_internas[dm] for dt in datos if dm in dt.coords_internas) for dm in dims})

    return [_expandir_dims(x, dims, coords, guardar_orden=False) if isinstance(x, Datos) else x for x in args]


def _redimensionar_como(plntll, *args):
    return [_expandir_dims(x, plntll.dims, plntll.coords_internas) if isinstance(x, Datos) else x for x in args]


def _intersec_datos(*args):
    datos = [x for x in args if isinstance(x, Datos)]

    c_final = _intersec_coords(*[dt.coords_internas for dt in datos])
    return [
        x.loc[frozendict({dm: tuple(c) for dm, c in c_final.items() if dm in x.coords_internas})] if isinstance(x,
                                                                                                                Datos) else x
        for x in args
    ]


@functools.lru_cache
def _intersec_coords(*args):
    dims = set(dm for crd in args for dm in crd)
    c_final = frozendict({
        dm: tuple(dict.fromkeys([
            c for crds in args if dm in crds for c in crds[dm]
            if all(c in crds_o[dm] for crds_o in args if dm in crds_o)
        ]))
        for dm in dims
    })

    return c_final


@functools.lru_cache
def _gen_f_expandir_dims(dims_datos, coords_datos, dims, coords, guardar_orden):
    dims_prior, extras = (dims_datos, dims) if guardar_orden else (dims, dims_datos)
    d_final = tuple([*dims_prior, *(d for d in extras if d not in dims_prior)])

    if dims_datos != d_final:
        if set(dims_datos) != set(d_final):
            # Agregar dims
            c_final = {dm: list(coords_datos[dm]) if dm in coords_datos else coords[dm] for dm in d_final}
            frm_matr = tuple(len(v) if ll in coords_datos else 1 for ll, v in c_final.items())
            frm_final = tuple(len(v) for v in c_final.values())

            def f(datos):
                # Copia de `broadcast_to` necesaria para evitar error Numpy con Datos.matr += V después
                datos = Datos(np.broadcast_to(datos.matr.reshape(frm_matr), frm_final).copy(), d_final,
                              coords=frozendict(
                                  {ll: tuple(v) if isinstance(v, list) else v for ll, v in c_final.items()}),
                              nombre=datos.nombre, atribs=datos.atribs, _conv_coords=datos._conv_coords, _verif=False)
                # Reordenar dims
                return datos.transponer(d_final)
        else:
            def f(datos):
                # Reordenar dims
                return datos.transponer(d_final)
    else:
        def f(datos):
            return datos
    return f


def _expandir_dims(datos, dims, coords, guardar_orden=True):
    f = _gen_f_expandir_dims(datos.dims, datos.coords_internas, dims, coords, guardar_orden=guardar_orden)
    return f(datos)


def codificar_coords(coords, datos: Optional[Datos] = None):
    return frozendict({
        ll: tuple(
            _codificar_coord(o, datos) for o in v
        ) if isinstance(v, (range, tuple, list, set, np.ndarray, pd.Index)) else (_codificar_coord(v, datos),) for
        ll, v in coords.items()
    })


def _codificar_coord(valor, datos):
    if isinstance(valor, (str, int, np.number)):
        return valor

    if type(valor) is pd.Timestamp:
        código = valor.toordinal()
    else:
        código = id(valor)
    if datos:
        datos._conv_coords[código] = valor
    return código