tinamit0/datos/bd.py
import os
from warnings import warn as avisar
import numpy as np
import pandas as pd
import xarray as xr
from tinamit.config import _
from .fuente import FuentePandas, FuenteDic, FuenteVarXarray, FuenteBaseXarray, FuenteCSV, Fuente
class BD(object):
"""
Una base de datos combina varias :class:`~tinamit.datos.fuente.Fuente`.
"""
def __init__(símismo, fuentes):
"""
Parameters
----------
fuentes: Fuente or list
Las fuentes de la base de datos.
"""
fuentes = [fuentes] if not isinstance(fuentes, (list, tuple)) else fuentes
símismo.fuentes = [_gen_fuente(f) for f in fuentes]
símismo.variables = list(set((v for f in símismo.fuentes for v in f.variables)))
símismo.lugares = np.unique(
np.concatenate([np.unique(fnt.lugares) for fnt in símismo.fuentes if fnt.lugares is not None])
)
fechas = [np.unique(fnt.fechas) for fnt in símismo.fuentes if fnt.fechas is not None]
if fechas:
símismo.fechas = np.unique(
np.concatenate(fechas)
)
else:
símismo.fechas = fechas
def obt_vals(símismo, vars_interés=None, lugares=None, fechas=None):
"""
Devuelve los valores de unos variables de interés.
Parameters
----------
vars_interés: str or list
Los variables de interés.
lugares: list
Lugares de interés.
fechas: tuple or list
Returns
-------
xr.DataArray, xr.Dataset
``xr.DataArray`` si ``vars_interés`` es ``str``, ``xr.Dataset`` si ``vars_interés`` es ``list``.
"""
vr_único = False
if isinstance(vars_interés, str):
vars_interés = [vars_interés]
vr_único = True
if vars_interés is None:
vars_interés = símismo.variables
l_vals_fnts = [
f.obt_vals(vars_interés=vars_interés, lugares=lugares, fechas=fechas) for f in símismo.fuentes
]
l_vals_fnts = [v for v in l_vals_fnts if len(v.columns)]
vals = l_vals_fnts[0]
for vl in l_vals_fnts[1:]:
nuevas_cols = [cl for cl in vl.columns if cl not in vals.columns]
vals = vals.join(vl[nuevas_cols], how='outer').fillna(vl)
return vals[vars_interés[0]] if vr_único else vals
def interpolar(símismo, vars_interés, lugares=None, fechas=None, extrap=False):
"""
Interpola datos por fecha, tomando el lugar en cuenta.
Parameters
----------
vars_interés: str or list
Los variables de interés.
lugares: list
Lugares de interés.
fechas: list or str or datetime.datetime
Las fechas de interés.
extrap: bool
Si hay que extrapolar también.
Returns
-------
xr.DataArray, xr.Dataset
``xr.DataArray`` si ``vars_interés`` es ``str``, ``xr.Dataset`` si ``vars_interés`` es ``list``.
"""
datos = símismo.obt_vals(vars_interés=vars_interés, lugares=lugares)
if datos.size:
datos = _interpolar_xr(datos, fechas=fechas)
if extrap:
datos = datos.fillna(method='bfill')
datos = datos.fillna(method='ffill')
return datos
def _gen_fuente(fnt, nombre=None, lugares=None, fechas=None):
if isinstance(fnt, pd.DataFrame):
return FuentePandas(fnt, nombre or 'pandas', lugares=lugares, fechas=fechas)
elif isinstance(fnt, dict):
return FuenteDic(fnt, nombre or 'dic', lugares=lugares, fechas=fechas)
elif isinstance(fnt, xr.Dataset):
return FuenteBaseXarray(fnt, nombre or 'xarray', lugares=lugares, fechas=fechas)
elif isinstance(fnt, xr.DataArray):
return FuenteVarXarray(fnt, nombre or 'xarray', lugares=lugares, fechas=fechas)
elif isinstance(fnt, str):
ext = os.path.splitext(fnt)[1]
if ext == '.csv':
return FuenteCSV(fnt, lugares=lugares, fechas=fechas)
else:
raise ValueError(_('Formato de base de datos "{}" no reconocido.').format(ext))
return fnt
def _interpolar_xr(m, fechas=None):
if fechas is not None:
raise ValueError
interpolado = m.unstack().interpolate(limit_area='inside').stack()
if len(interpolado.dropna()):
return interpolado
avisar('Extrapolando por falta de datos.')
return m.unstack().interpolate().stack()
if m.sizes[_('fecha')] > 1:
m = m.interpolate_na(_('fecha')).fillna(m)
if fechas is not None:
fechas = [fechas] if not isinstance(fechas, (tuple, list)) else fechas
m = m.interp(
{_('fecha'): pd.DatetimeIndex(fechas)}
).fillna(m)
m = m.dropna(_('lugar'), how='all')
return m.stack(n=[_('lugar'), _('fecha')])