julienmalard/Tikon

View on GitHub
tikon/ecs/dists/anlt.py

Summary

Maintainability
C
1 day
Test Coverage
from typing import Union, Optional, TypedDict, Any, cast
from warnings import warn as avisar

import numpy as np
from matplotlib.axes import Axes
from scipy import stats as estad
from scipy.special import expit, logit
from scipy.stats._continuous_distns import FitDataError
from scipy.stats._distn_infrastructure import rv_continuous_frozen
from typing_extensions import NotRequired

from tikon.ecs.dists import utils
from tikon.ecs.dists.dibs import dibujar_dist
from tikon.ecs.dists.dists import Dist, _escala_inf, _dist_mu, DicDist
from tikon.ecs.dists.utils import obt_scipy, obt_nombre, obt_prms_obj_scipy, líms_dist, clase_scipy, prms_dist, \
    proc_líms, \
    Líms_Con_None, Líms_Numéricas
from tikon.ecs.utils import líms_compat
from tikon.tipos import Tipo_Matriz_Numérica, Tipo_Matriz_Núm_Entero, Tipo_Valor_Numérico


class DicDistAnalítica(DicDist, TypedDict):
    transf: Optional["DicTransfDist"]
    paráms: tuple[tuple, Tipo_Valor_Numérico, Tipo_Valor_Numérico]
    dist: str


class DistAnalítica(Dist):
    dist: rv_continuous_frozen
    _transf: Optional["TransfDist"]
    paráms = tuple[tuple, int | float, int | float]
    líms: tuple[Tipo_Valor_Numérico, Tipo_Valor_Numérico]

    def __init__(símismo, dist: rv_continuous_frozen, transf: "TransfDist" = None):

        símismo.nombre_dist = obt_nombre(dist)
        paráms = obt_prms_obj_scipy(dist)
        if transf is None:
            prms, ubic, escl = paráms
            if ubic != 0 or escl != 1:
                dist = obt_scipy(símismo.nombre_dist, (prms, 0, 1))
                transf = TransfDist(None, ubic=ubic, escl=escl)
                paráms = (prms, 0, 1)

        símismo.dist = dist
        símismo.paráms = paráms
        símismo._transf = transf

        símismo.líms = tuple(símismo.transf_vals(np.array(líms_dist(símismo.dist))))

    def obt_vals(símismo, n: int) -> Tipo_Matriz_Numérica:
        return símismo.transf_vals(símismo.dist.rvs(n))

    def obt_vals_índ(símismo, í: Tipo_Matriz_Núm_Entero) -> Tipo_Matriz_Numérica:
        return símismo.obt_vals(n=len(í))

    def tmñ(símismo) -> Union[int, float]:
        return np.inf

    def aprox_líms(símismo, prc) -> np.ndarray[Any, np.dtype[np.number]]:

        # Las superficies de las colas que hay que dejar afuera del rango de los límites
        colas = ((1 - prc) / 2, 0.5 + prc / 2)

        líms_d = np.array([símismo.dist.ppf(colas[0]), símismo.dist.ppf(colas[1])])

        return símismo.transf_vals(líms_d)

    def transf_vals(símismo, vals: Tipo_Matriz_Numérica) -> Tipo_Matriz_Numérica:

        if símismo._transf is not None:
            vals = símismo._transf.transf(vals)

        return vals

    def dibujar(símismo, nombre=None, ejes=None, argsll=None) -> Axes:
        return dibujar_dist(símismo, nombre=nombre or símismo.nombre_dist, ejes=ejes, argsll=argsll)

    def a_dic(símismo) -> DicDistAnalítica:
        return DicDistAnalítica(
            tipo=símismo.__class__.__name__,
            transf=símismo._transf.a_dic() if símismo._transf else None,
            dist=símismo.nombre_dist,
            paráms=símismo.paráms
        )

    @classmethod
    def de_dic(cls, dic: DicDist) -> "DistAnalítica":
        if dic["tipo"] != "DistAnalítica":
            raise ValueError(dic)

        dicDistAnalítica = cast(DicDistAnalítica, dic)
        transf = TransfDist.de_dic(dicDistAnalítica['transf']) if dicDistAnalítica['transf'] else None
        dist_sp = obt_scipy(dicDistAnalítica['dist'], dicDistAnalítica['paráms'])
        return cls(dist=dist_sp, transf=transf)

    @classmethod
    def de_líms(cls, líms: Optional[Líms_Con_None]) -> "DistAnalítica":
        líms = proc_líms(líms)

        if líms[0] == -np.inf:
            if líms[1] == np.inf:
                return DistAnalítica(dist=estad.norm(), transf=TransfDist(None, ubic=0, escl=_escala_inf))

            return DistAnalítica(dist=estad.expon(), transf=TransfDist(None, ubic=líms[1], escl=-_escala_inf))

        if líms[1] == np.inf:
            return DistAnalítica(dist=estad.expon(), transf=TransfDist(None, ubic=líms[0], escl=_escala_inf))

        return DistAnalítica(dist=estad.uniform(), transf=TransfDist(None, ubic=líms[0], escl=líms[1] - líms[0]))

    @classmethod
    def de_dens(
            cls, dens: int | float, líms_dens: Optional[Líms_Con_None],
            líms: Optional[Líms_Con_None]
    ) -> "DistAnalítica":
        líms_dens_resueltas = np.array(proc_líms(líms_dens))
        líms_resueltas = np.array(proc_líms(líms))
        líms_compat(líms_dens_resueltas, líms_resueltas)

        if dens == 1:
            if np.isinf(líms_dens_resueltas[0]) or np.isinf(líms_dens_resueltas[1]):
                raise ValueError(
                    'No se puede especificar densidad de 1 con rango ilimitado como "{}".'.format(líms_dens_resueltas)
                )
            return DistAnalítica(
                dist=estad.uniform(), transf=TransfDist(None, ubic=líms_dens_resueltas[0], escl=líms_dens_resueltas[1] - líms_dens_resueltas[0])
            )
        elif dens <= 0 or dens > 1:
            raise ValueError('La densidad debe ser en (0, 1].')

        if líms_resueltas[0] == -np.inf:
            if líms_resueltas[1] == np.inf:
                transf = None
            else:
                escl = (líms_resueltas[1] - líms_dens_resueltas[1]) or ((líms_resueltas[1] - líms_dens_resueltas[0]) if líms_dens_resueltas[0] != -np.inf else 1)
                transf = TransfDist('LnExp', ubic=líms_resueltas[1], escl=-escl)

        elif líms_resueltas[1] == np.inf:
            escl = (líms_dens_resueltas[0] - líms_resueltas[0]) or ((líms_dens_resueltas[1] - líms_resueltas[0]) if líms_dens_resueltas[1] != np.inf else 1)
            transf = TransfDist('LnExp', ubic=líms_resueltas[0], escl=escl)
        else:
            transf = TransfDist('Expit', ubic=líms_resueltas[0], escl=líms_resueltas[1] - líms_resueltas[0])

        if transf is None:
            líms_dens_intern = líms_dens_resueltas
        else:
            líms_dens_intern = transf.transf_inv(líms_dens_resueltas)
            líms_dens_intern.sort()

        if líms_dens_intern[0] == -np.inf:
            if líms_dens_intern[1] == np.inf:
                raise ValueError(
                    'Rangos idénticos como {r1} y {r2} no pueden tener densidad inferior a '
                    '1.'.format(r1=líms_resueltas, r2=líms_dens_resueltas)
                )
            mu = líms_dens_intern[1] - _dist_mu
            sg = -_dist_mu / estad.norm.ppf(1 - dens)

        elif líms_dens_intern[1] == np.inf:
            mu = líms_dens_intern[0] + _dist_mu
            sg = -_dist_mu / estad.norm.ppf(1 - dens)

        else:
            mu = (líms_dens_intern[1] + líms_dens_intern[0]) / 2
            sg = (líms_dens_intern[0] - líms_dens_intern[1]) / 2 / estad.norm.ppf((1 - dens) / 2)

        return DistAnalítica(estad.norm(loc=mu, scale=sg), transf=transf)

    @classmethod
    def de_traza(cls, trz: Tipo_Matriz_Numérica, líms: Líms_Con_None, permitidas: list[str] = None):
        permitidas = permitidas or list(utils.dists)

        líms = proc_líms(líms)
        if trz.min() < líms[0] or trz.max() > líms[1]:
            raise ValueError('Valores en traza deben caber en los límites teoréticos de la distribución.')

        # Un diccionario para guardar el mejor ajuste
        class DicMejorAjuste(TypedDict):
            p: int | float
            dist: rv_continuous_frozen
            transf: TransfDist

        mejor_ajuste: DicMejorAjuste | None = None

        for nmbr in permitidas:
            líms_d = líms_dist(nmbr)
            try:
                _líms_compat_teor(líms, líms_d)
            except ValueError:
                continue

            if líms_d[0] == -np.inf and líms_d[1] == np.inf:
                transf = _gen_transf_sin_líms(trz, líms)
                restric = {}
            elif líms_d[0] == -np.inf or líms_d[1] == np.inf:
                transf = _gen_transf_1_lím(trz, líms, líms_d)
                restric = {'floc': 0}
            else:
                escl = (líms[1] - líms[0]) / (líms_d[1] - líms_d[0])
                ubic = líms[0] - líms_d[0] * escl
                transf = TransfDist(None, ubic=ubic, escl=escl)
                # En el caso [R, R], limitamos los valores inferiores y superiores de la distribución.
                restric = {'floc': 0, 'fscale': 1}

            trz_transf = transf.transf_inv(trz)

            cls_sp = clase_scipy(nmbr)
            if len(prms_dist(nmbr)) == len(restric):
                prms = {'loc': 0, 'scale': 1}
            else:
                try:
                    ajustados = cls_sp.fit(trz_transf, **restric)
                except FitDataError:
                    continue
                prms = {pr: vl for pr, vl in zip(prms_dist(nmbr), ajustados)}
            p = estad.kstest(rvs=trz_transf, cdf=cls_sp(**prms).cdf)[1]

            # Si el ajuste es mejor que el mejor ajuste anterior...
            if not mejor_ajuste or p > mejor_ajuste['p']:
                # Guardarlo
                mejor_ajuste = dict(dist=cls_sp(**prms), transf=transf, p=p)

        if not mejor_ajuste:
            raise ValueError('No se encontró distribución permitida compatible.')

        # Si no logramos un buen aujste, avisar al usuario.
        if mejor_ajuste['p'] <= 0.10:
            avisar('El ajuste de la mejor distribución quedó muy mal (p = {:.6f}).'.format(mejor_ajuste['p']))

        return DistAnalítica(dist=mejor_ajuste["dist"], transf=mejor_ajuste["transf"])


class DicTransfDist(TypedDict):
    transf: Optional[str]
    ubic: Tipo_Valor_Numérico
    escl: Tipo_Valor_Numérico


class TransfDist(object):
    def __init__(símismo, transf: Optional[str], ubic: Tipo_Valor_Numérico = 0, escl: Tipo_Valor_Numérico = 1):
        """

        Parameters
        ----------
        transf: str or None
        ubic: int or float
        escl: int or float
        """
        if transf is None:
            símismo._f = símismo._f_inv = lambda x: x
            símismo._transf = None
        else:
            símismo._transf = transf.lower()
            if símismo._transf == 'expit':
                símismo._f = expit
                símismo._f_inv = logit
            elif símismo._transf == 'lnexp':
                símismo._f = lnexp
                símismo._f_inv = invlnexp
            else:
                raise ValueError(transf)

        símismo._ubic = ubic
        símismo._escl = escl

    def transf(símismo, vals: Tipo_Matriz_Numérica) -> Tipo_Matriz_Numérica:
        return símismo._f(vals) * símismo._escl + símismo._ubic

    def transf_inv(símismo, vals: Tipo_Matriz_Numérica) -> Tipo_Matriz_Numérica:
        return símismo._f_inv((vals - símismo._ubic) / símismo._escl)

    def a_dic(símismo) -> DicTransfDist:
        return {
            'transf': símismo._transf,
            'ubic': símismo._ubic,
            'escl': símismo._escl
        }

    @classmethod
    def de_dic(cls, dic: DicTransfDist) -> "TransfDist":
        return TransfDist(dic['transf'], ubic=dic['ubic'], escl=dic['escl'])


def _gen_transf_sin_líms(trz: Tipo_Matriz_Numérica, líms: Líms_Numéricas) -> TransfDist:
    if líms[0] == -np.inf:
        if líms[1] == np.inf:
            # noinspection PyTypeChecker
            return TransfDist(None, ubic=np.mean(trz), escl=np.std(trz))
        return TransfDist('LnExp', ubic=líms[1], escl=-np.std(trz))

    if líms[1] == np.inf:
        # noinspection PyTypeChecker
        return TransfDist('LnExp', ubic=líms[0], escl=np.std(trz))
    return TransfDist('Expit', ubic=líms[0], escl=líms[1] - líms[0])


def _gen_transf_1_lím(trz: Tipo_Matriz_Numérica, líms: Líms_Numéricas, líms_d: Líms_Numéricas) -> TransfDist:
    if líms_d[0] == -np.inf:
        líms_d = (-líms_d[1], np.inf)
        líms = (-líms[1], -líms[0])

    if líms[0] == -np.inf:
        escl = trz.std()
        ubic = líms[1] - líms_d[0] * escl
        return TransfDist(None, ubic=ubic, escl=-escl)
    elif líms[1] == np.inf:
        escl = trz.std()
        ubic = líms[0] - líms_d[0] * escl
        return TransfDist(None, ubic=ubic, escl=escl)

    raise ValueError(líms)


def lnexp(x: Tipo_Matriz_Numérica) -> Tipo_Matriz_Numérica:
    return np.log(np.exp(x) + 1)


def invlnexp(x: Tipo_Matriz_Numérica) -> Tipo_Matriz_Numérica:
    return np.where(x >= 50, x, np.log(np.exp(x) - 1))


def _líms_compat_teor(líms: Líms_Con_None, ref: Líms_Con_None) -> None:
    líms = proc_líms(líms)
    ref = proc_líms(ref)
    suma = sum([ref[0] == -np.inf, ref[1] == np.inf])
    if suma == 1 and sum([líms[0] == -np.inf, líms[1] == np.inf]) != 1:
        raise ValueError
    elif suma == 0 and (líms[0] == -np.inf or líms[1] == np.inf):
        raise ValueError