src/wafo/kdetools/kdetools.py
#!/usr/bin/env python
# -------------------------------------------------------------------------
# Name: kdetools
# Purpose:
#
# Author: pab
#
# Created: 01.11.2008
# Copyright: (c) pab 2008
# Licence: LGPL
# -------------------------------------------------------------------------
# from abc import ABCMeta, abstractmethod
import copy
import warnings
import numpy as np
from numpy import linalg
import scipy.stats as st
from scipy import interpolate, linalg, special
from numpy import sqrt, atleast_2d, meshgrid
from numpy.fft import fftn, ifftn
from wafo.misc import nextpow2
from wafo.containers import PlotData
from wafo.testing import test_docstrings
from wafo.kdetools.kernels import iqrange, qlevels, Kernel
from wafo.kdetools.gridding import gridcount
__all__ = ['TKDE', 'KDE', 'test_docstrings', 'KRegression', 'BKRegression']
_TINY = np.finfo(float).tiny
# _REALMIN = np.finfo(float).min
_REALMAX = np.finfo(float).max
_EPS = np.finfo(float).eps
def _assert(cond, msg):
if not cond:
raise ValueError(msg)
def _assert_warn(cond, msg):
if not cond:
warnings.warn(msg)
def _invnorm(q):
return special.ndtri(q)
def _logit(p):
pc = p.clip(min=_TINY, max=1-_EPS) # trick to avoid division by zero in log(pc) and log1p(-pc)
return np.log(pc) - np.log1p(-pc)
# def _logitinv(x):
# return 1.0 / (np.exp(-x) + 1)
class _KDE(object):
def __init__(self, data, kernel=None, xmin=None, xmax=None):
self.dataset = data
self.xmin = xmin
self.xmax = xmax
self.kernel = kernel if kernel else Kernel('gauss')
self.args = None
@property
def inc(self):
return self._inc
@inc.setter
def inc(self, inc):
# pylint: disable=attribute-defined-outside-init
self._inc = inc
@property
def dataset(self):
return self._dataset
@dataset.setter
def dataset(self, data):
# pylint: disable=attribute-defined-outside-init
self._dataset = atleast_2d(data)
@property
def n(self):
return self.dataset.shape[1]
@property
def d(self):
return self.dataset.shape[0]
@property
def sigma(self):
"""minimum(stdev, 0.75 * interquartile-range)"""
iqr = iqrange(self.dataset, axis=-1)
sigma = np.minimum(np.std(self.dataset, axis=-1, ddof=1), iqr / 1.34)
return sigma
@property
def xmin(self):
return self._xmin
@xmin.setter
def xmin(self, xmin):
if xmin is None:
xmin = self.dataset.min(axis=-1) - 2 * self.sigma
# pylint: disable=attribute-defined-outside-init
self._xmin = self._check_xmin(xmin * np.ones(self.d))
def _check_xmin(self, xmin):
return xmin
@property
def xmax(self):
return self._xmax
@xmax.setter
def xmax(self, xmax):
if xmax is None:
xmax = self.dataset.max(axis=-1) + 2 * self.sigma
# pylint: disable=attribute-defined-outside-init
self._xmax = self._check_xmax(xmax * np.ones(self.d))
def _check_xmax(self, xmax):
return xmax
def eval_grid_fast(self, *args, **kwds):
"""Evaluate the estimated pdf on a grid using fft.
Parameters
----------
arg_0,arg_1,... arg_d-1 : vectors
Alternatively, if no vectors is passed in then
arg_i = linspace(self.xmin[i], self.xmax[i], self.inc)
output : string optional
'value' if value output
'data' if object output
Returns
-------
values : array-like
The values evaluated at meshgrid(*args).
"""
return self.eval_grid_fun(self._eval_grid_fast, *args, **kwds)
def _eval_grid_fast(self, *args, **kwds):
pass
def eval_grid(self, *args, **kwds):
"""Evaluate the estimated pdf on a grid.
Parameters
----------
arg_0,arg_1,... arg_d-1 : vectors
Alternatively, if no vectors is passed in then
arg_i = linspace(self.xmin[i], self.xmax[i], self.inc)
output : string optional
'value' if value output
'data' if object output
Returns
-------
values : array-like
The values evaluated at meshgrid(*args).
"""
return self.eval_grid_fun(self._eval_grid, *args, **kwds)
def _eval_grid(self, *args, **kwds):
pass
def _add_contour_levels(self, wdata):
p_levels = np.r_[10:90:20, 95, 99, 99.9]
try:
c_levels = qlevels(wdata.data, p=p_levels)
wdata.clevels = c_levels
wdata.plevels = p_levels
except ValueError as e:
msg = "Could not calculate contour levels!. ({})".format(str(e))
warnings.warn(msg)
def _make_object(self, f, **kwds):
titlestr = 'Kernel density estimate ({})'.format(self.kernel.name)
kwds2 = dict(title=titlestr)
kwds2['plot_kwds'] = dict(plotflag=1)
kwds2.update(**kwds)
args = self.args
if self.d == 1:
args = args[0]
wdata = PlotData(f, args, **kwds2)
if self.d > 1:
self._add_contour_levels(wdata)
return wdata
def get_args(self, xmin=None, xmax=None):
sxmin = self.xmin
if xmin is not None:
sxmin = np.minimum(xmin, sxmin)
sxmax = self.xmax
if xmax is not None:
sxmax = np.maximum(xmax, sxmax)
args = []
inc = self.inc
for i in range(self.d):
args.append(np.linspace(sxmin[i], sxmax[i], inc))
return args
def eval_grid_fun(self, eval_grd, *args, **kwds):
if len(args) == 0:
args = self.get_args()
self.args = args
output = kwds.pop('output', 'value')
f = eval_grd(*args, **kwds)
if output == 'value':
return f
return self._make_object(f, **kwds)
def _check_shape(self, points):
points = atleast_2d(points)
d, m = points.shape
if d != self.d:
_assert(d == 1 and m == self.d, "points have dimension {}, "
"dataset has dimension {}".format(d, self.d))
# points was passed in as a row vector
points = np.reshape(points, (self.d, 1))
return points
def eval_points(self, points, **kwds):
"""Evaluate the estimated pdf on a set of points.
Parameters
----------
points : (# of dimensions, # of points)-array
Alternatively, a (# of dimensions,) vector can be passed in and
treated as a single point.
Returns
-------
values : (# of points,)-array
The values at each point.
Raises
------
ValueError if the dimensionality of the input points is different than
the dimensionality of the KDE.
"""
points = self._check_shape(points)
return self._eval_points(points, **kwds)
def _eval_points(self, points, **kwds):
pass
__call__ = eval_grid
class TKDE(_KDE):
""" Transformation Kernel-Density Estimator.
Parameters
----------
dataset : (# of dims, # of data)-array
datapoints to estimate from
hs : array-like (optional)
smooting parameter vector/matrix.
(default compute from data using kernel.get_smoothing function)
kernel : kernel function object.
kernel must have get_smoothing method
alpha : real scalar (optional)
sensitivity parameter (default 0 regular KDE)
A good choice might be alpha = 0.5 ( or 1/D)
alpha = 0 Regular KDE (hs is constant)
0 < alpha <= 1 Adaptive KDE (Make hs change)
xmin, xmax : vectors
specifying the default argument range for the kde.eval_grid methods.
For the kde.eval_grid_fast methods the values must cover the range of
the data. (default min(data)-range(data)/4, max(data)-range(data)/4)
If a single value of xmin or xmax is given then the boundary is the is
the same for all dimensions.
inc : scalar integer
defining the default dimension of the output from kde.eval_grid methods
(default 512)
(For kde.eval_grid_fast: A value below 50 is very fast to compute but
may give some inaccuracies. Values between 100 and 500 give very
accurate results)
L2 : array-like
vector of transformation parameters (default 1 no transformation)
t(xi;L2) = xi^L2*sign(L2) for L2(i) ~= 0
t(xi;L2) = log(xi) for L2(i) == 0
If single value of L2 is given then the transformation is the same in
all directions.
Members
-------
d : int
number of dimensions
n : int
number of datapoints
Methods
-------
kde.eval_grid_fast(x0, x1,..., xd) : array
evaluate the estimated pdf on meshgrid(x0, x1,..., xd)
kde.eval_grid(x0, x1,..., xd) : array
evaluate the estimated pdf on meshgrid(x0, x1,..., xd)
kde.eval_points(points) : array
evaluate the estimated pdf on a provided set of points
kde(x0, x1,..., xd) : array
same as kde.eval_grid(x0, x1,..., xd)
Examples
--------
N = 20
data = np.random.rayleigh(1, size=(N,))
>>> data = np.array([
... 0.75355792, 0.72779194, 0.94149169, 0.07841119,2.32291887,
... 1.10419995, 0.77055114, 0.60288273, 1.36883635, 1.74754326,
... 1.09547561, 1.01671133, 0.73211143, 0.61891719, 0.75903487,
... 1.8919469 , 0.72433808, 1.92973094, 0.44749838, 1.36508452])
>>> import wafo.kdetools as wk
>>> x = np.linspace(0.01, max(data.ravel()) + 1, 10)
>>> kde = wk.TKDE(data, hs=0.5, L2=0.5)
>>> f = kde(x)
>>> np.allclose(f,
... [ 1.03982714, 0.45839018, 0.39514782, 0.32860602, 0.26433318,
... 0.20717946, 0.15907684, 0.1201074 , 0.08941027, 0.06574882])
True
>>> np.allclose(kde.eval_grid(x),
... [ 1.03982714, 0.45839018, 0.39514782, 0.32860602, 0.26433318,
... 0.20717946, 0.15907684, 0.1201074 , 0.08941027, 0.06574882])
True
>>> np.allclose(kde.eval_grid_fast(x),
... [ 1.04018924, 0.45838973, 0.39514689, 0.32860532, 0.26433301,
... 0.20717976, 0.15907697, 0.1201077 , 0.08941129, 0.06574899])
True
import matplotlib.pyplot as plt
h1 = plt.plot(x, f) # 1D probability density plot
t = np.trapz(f, x)
"""
def __init__(self, data, hs=None, kernel=None, alpha=0.0,
xmin=None, xmax=None, inc=512, L2=None):
self.L2 = L2
super(TKDE, self).__init__(data, kernel, xmin, xmax)
tdataset = self._transform(self.dataset)
txmin = np.ravel(self._transform(np.reshape(self.xmin, (-1, 1))))
txmax = np.ravel(self._transform(np.reshape(self.xmax, (-1, 1))))
self.tkde = KDE(tdataset, hs, self.kernel, alpha, txmin, txmax, inc)
def _check_xmin(self, xmin):
if self.L2 is not None:
amin = self.dataset.min(axis=-1)
L2 = np.atleast_1d(self.L2) * np.ones(self.d)
xmin = np.where(L2 != 1, np.maximum(xmin, amin / 100.0), xmin)
return xmin
@property
def inc(self):
return self.tkde.inc
@inc.setter
def inc(self, inc):
self.tkde.inc = inc
@property
def hs(self):
return self.tkde.hs
@hs.setter
def hs(self, hs):
self.tkde.hs = hs
def _transform(self, points):
if self.L2 is None:
return points # default no transformation
L2 = np.atleast_1d(self.L2) * np.ones(self.d)
tpoints = copy.copy(points)
for i, v2 in enumerate(L2.tolist()):
tpoints[i] = np.log(points[i]) if v2 == 0 else points[i] ** v2
return tpoints
def _inverse_transform(self, tpoints):
if self.L2 is None:
return tpoints # default no transformation
L2 = np.atleast_1d(self.L2) * np.ones(self.d)
points = copy.copy(tpoints)
for i, v2 in enumerate(L2.tolist()):
points[i] = np.exp(
tpoints[i]) if v2 == 0 else tpoints[i] ** (1.0 / v2)
return points
def _scale_pdf(self, pdf, points):
if self.L2 is None:
return pdf
# default no transformation
L2 = np.atleast_1d(self.L2) * np.ones(self.d)
for i, v2 in enumerate(L2.tolist()):
factor = v2 * np.sign(v2) if v2 else 1
pdf *= np.where(v2 == 1, 1, points[i] ** (v2 - 1) * factor)
_assert_warn((np.abs(np.diff(pdf)).max() < 10).all(), '''
Numerical problems may have occured due to the power transformation.
Check the KDE for spurious spikes''')
return pdf
def _interpolate(self, points, f, *args, **kwds):
ipoints = meshgrid(*args) # if self.d > 1 else args
for i in range(self.d):
points[i].shape = -1,
points = np.asarray(points).T
fi = interpolate.griddata(points, np.ravel(f), tuple(ipoints),
method='linear', fill_value=0.0)
self.args = args
r = kwds.get('r', 0)
if r == 0:
return fi * (fi > 0)
return fi
def _get_targs(self, args):
targs = []
if len(args):
targs0 = self._transform(list(args))
xmin = [min(t) for t in targs0]
xmax = [max(t) for t in targs0]
targs = self.tkde.get_args(xmin, xmax)
return targs
def _eval_grid_fast(self, *args, **kwds):
if self.L2 is None:
f = self.tkde.eval_grid_fast(*args, **kwds)
self.args = self.tkde.args
return f
targs = self._get_targs(args)
tf = self.tkde.eval_grid_fast(*targs)
self.args = self._inverse_transform(list(self.tkde.args))
points = meshgrid(*self.args)
f = self._scale_pdf(tf, points)
if len(args):
return self._interpolate(points, f, *args, **kwds)
return f
def _eval_grid(self, *args, **kwds):
if self.L2 is None:
return self.tkde.eval_grid(*args, **kwds)
targs = self._transform(list(args))
tf = self.tkde.eval_grid(*targs, **kwds)
points = meshgrid(*args)
f = self._scale_pdf(tf, points)
return f
def _eval_points(self, points, **kwds):
"""Evaluate the estimated pdf on a set of points.
Parameters
----------
points : (# of dimensions, # of points)-array
Alternatively, a (# of dimensions,) vector can be passed in and
treated as a single point.
Returns
-------
values : (# of points,)-array
The values at each point.
Raises
------
ValueError if the dimensionality of the input points is different than
the dimensionality of the KDE.
"""
if self.L2 is None:
return self.tkde.eval_points(points)
tpoints = self._transform(points)
tf = self.tkde.eval_points(tpoints)
f = self._scale_pdf(tf, points)
return f
class KDE(_KDE):
""" Kernel-Density Estimator.
Parameters
----------
data : (# of dims, # of data)-array
datapoints to estimate from
hs : array-like (optional)
smooting parameter vector/matrix.
(default compute from data using kernel.get_smoothing function)
kernel : kernel function object.
kernel must have get_smoothing method
alpha : real scalar (optional)
sensitivity parameter (default 0 regular KDE)
A good choice might be alpha = 0.5 ( or 1/D)
alpha = 0 Regular KDE (hs is constant)
0 < alpha <= 1 Adaptive KDE (Make hs change)
xmin, xmax : vectors
specifying the default argument range for the kde.eval_grid methods.
For the kde.eval_grid_fast methods the values must cover the range of
the data.
(default min(data)-range(data)/4, max(data)-range(data)/4)
If a single value of xmin or xmax is given then the boundary is the is
the same for all dimensions.
inc : scalar integer (default 512)
defining the default dimension of the output from kde.eval_grid methods
(For kde.eval_grid_fast: A value below 50 is very fast to compute but
may give some inaccuracies. Values between 100 and 500 give very
accurate results)
Members
-------
d : int
number of dimensions
n : int
number of datapoints
Methods
-------
kde.eval_grid_fast(x0, x1,..., xd) : array
evaluate the estimated pdf on meshgrid(x0, x1,..., xd)
kde.eval_grid(x0, x1,..., xd) : array
evaluate the estimated pdf on meshgrid(x0, x1,..., xd)
kde.eval_points(points) : array
evaluate the estimated pdf on a provided set of points
kde(x0, x1,..., xd) : array
same as kde.eval_grid(x0, x1,..., xd)
Examples
--------
N = 20
data = np.random.rayleigh(1, size=(N,))
>>> data = np.array([
... 0.75355792, 0.72779194, 0.94149169, 0.07841119, 2.32291887,
... 1.10419995, 0.77055114, 0.60288273, 1.36883635, 1.74754326,
... 1.09547561, 1.01671133, 0.73211143, 0.61891719, 0.75903487,
... 1.8919469 , 0.72433808, 1.92973094, 0.44749838, 1.36508452])
>>> x = np.linspace(0, max(data.ravel()) + 1, 10)
>>> import wafo.kdetools as wk
>>> kde = wk.KDE(data, hs=0.5, alpha=0.5)
>>> f = kde(x)
>>> np.allclose(f,
... [ 0.17252055, 0.41014271, 0.61349072, 0.57023834, 0.37198073,
... 0.21409279, 0.12738463, 0.07460326, 0.03956191, 0.01887164])
True
>>> np.allclose(kde.eval_grid(x),
... [ 0.17252055, 0.41014271, 0.61349072, 0.57023834, 0.37198073,
... 0.21409279, 0.12738463, 0.07460326, 0.03956191, 0.01887164])
True
>>> np.allclose(kde.eval_grid_fast(x),
... [ 0.20729484, 0.39865044, 0.53716945, 0.5169322 , 0.39060223,
... 0.26441126, 0.16388801, 0.08388527, 0.03227164, 0.00883579])
True
>>> kde0 = wk.KDE(data, hs=0.5, alpha=0.0)
>>> np.allclose(kde0.eval_points(x),
... [ 0.2039735 , 0.40252503, 0.54595078, 0.52219649, 0.3906213 ,
... 0.26381501, 0.16407362, 0.08270612, 0.02991145, 0.00720821])
True
>>> np.allclose(kde0.eval_grid(x),
... [ 0.2039735 , 0.40252503, 0.54595078, 0.52219649, 0.3906213 ,
... 0.26381501, 0.16407362, 0.08270612, 0.02991145, 0.00720821])
True
>>> f = kde0.eval_grid(x, output='plotobj')
>>> np.allclose(f.data,
... [ 0.2039735 , 0.40252503, 0.54595078, 0.52219649, 0.3906213 ,
... 0.26381501, 0.16407362, 0.08270612, 0.02991145, 0.00720821])
True
>>> f = kde0.eval_grid_fast()
>>> np.allclose(np.interp(x, kde0.args[0], f),
... [ 0.20398034, 0.40252166, 0.54593292, 0.52218993, 0.39062245,
... 0.26381651, 0.16407487, 0.08270847, 0.02991439, 0.00882095])
True
>>> f1 = kde0.eval_grid_fast(output='plot')
>>> np.allclose(np.interp(x, f1.args, f1.data),
... [ 0.20398034, 0.40252166, 0.54593292, 0.52218993, 0.39062245,
... 0.26381651, 0.16407487, 0.08270847, 0.02991439, 0.00882095])
True
h = f1.plot()
import matplotlib.pyplot as plt
h1 = plt.plot(x, f) # 1D probability density plot
t = np.trapz(f, x)
"""
def __init__(self, data, hs=None, kernel=None, alpha=0.0, xmin=None,
xmax=None, inc=512):
super(KDE, self).__init__(data, kernel, xmin, xmax)
self.hs = hs
self.inc = inc
self.alpha = alpha
def _replace_negatives_with_default_hs(self, h):
get_default_hs = self.kernel.get_smoothing
ind, = np.where(h <= 0)
for i in ind.tolist():
h[i] = get_default_hs(self.dataset[i])
def _check_hs(self, h):
"""make sure it has the correct dimension and replace negative vals"""
h = np.atleast_1d(h)
if (len(h.shape) == 1) or (self.d == 1):
h = h * np.ones(self.d) if max(h.shape) == 1 else h.reshape(self.d)
self._replace_negatives_with_default_hs(h)
return h
def _invert_hs(self, h):
if (len(h.shape) == 1) or (self.d == 1):
determinant = h.prod()
inv_hs = np.diag(1.0 / h)
else: # fully general smoothing matrix
determinant = linalg.det(h)
_assert(0 < determinant,
'bandwidth matrix h must be positive definit!')
inv_hs = linalg.inv(h)
return inv_hs, determinant
@property
def hs(self):
return self._hs
@hs.setter
def hs(self, h):
if h is None:
h = self.kernel.get_smoothing(self.dataset)
h = self._check_hs(h)
# pylint: disable=attribute-defined-outside-init
self._inv_hs, deth = self._invert_hs(h)
self._norm_factor = deth * self.n
self._hs = h
@property
def inc(self):
return self._inc
@inc.setter
def inc(self, inc):
if inc is None:
_tau, tau = self.kernel.effective_support()
xyzrange = 8 * self.sigma
L1 = 10
inc = max(48, (L1 * xyzrange / (tau * self.hs)).max())
inc = 2 ** nextpow2(inc)
# pylint: disable=attribute-defined-outside-init
self._inc = inc
@property
def alpha(self):
return self._alpha
@alpha.setter
def alpha(self, alpha):
# pylint: disable=attribute-defined-outside-init
self._alpha = alpha
self._lambda = np.ones(self.n)
if alpha > 0:
f = self.eval_points(self.dataset) # pilot estimate
g = np.exp(np.mean(np.log(f)))
self._lambda = (f / g) ** (-alpha)
@staticmethod
def _make_flat_grid(dx, d, inc):
Xn = []
x0 = np.linspace(-inc, inc, 2 * inc + 1)
for i in range(d):
Xn.append(x0[:-1] * dx[i])
Xnc = meshgrid(*Xn)
for i in range(d):
Xnc[i].shape = (-1,)
return np.vstack(Xnc)
def _kernel_weights(self, Xn, dx, d, inc):
kw = self.kernel(Xn)
norm_fact0 = (kw.sum() * dx.prod() * self.n)
norm_fact = (self._norm_factor * self.kernel.norm_factor(d, self.n))
if np.abs(norm_fact0 - norm_fact) > 0.05 * norm_fact:
warnings.warn(
'Numerical inaccuracy due to too low discretization. ' +
'Increase the discretization of the evaluation grid ' +
'(inc={})!'.format(inc))
norm_fact = norm_fact0
kw = kw / norm_fact
return kw
def _eval_grid_fast(self, *args, **kwds):
X = np.vstack(args)
d, inc = X.shape
dx = X[:, 1] - X[:, 0]
Xnc = self._make_flat_grid(dx, d, inc)
Xn = np.dot(self._inv_hs, Xnc)
kw = self._kernel_weights(Xn, dx, d, inc)
r = kwds.get('r', 0)
if r != 0:
fun = self._moment_fun(r)
kw *= fun(np.vstack(Xnc))
kw.shape = (2 * inc, ) * d
kw = np.fft.ifftshift(kw)
y = kwds.get('y', 1.0)
if self.alpha > 0:
warnings.warn('alpha parameter is not used for binned kde!')
# Find the binned kernel weights, c.
c = gridcount(self.dataset, X, y=y)
# Perform the convolution.
z = np.real(ifftn(fftn(c, s=kw.shape) * fftn(kw)))
ix = (slice(0, inc),) * d
if r == 0:
return z[ix] * (z[ix] > 0.0)
return z[ix]
def _eval_grid(self, *args, **kwds):
grd = meshgrid(*args)
shape0 = grd[0].shape
d = len(grd)
for i in range(d):
grd[i] = grd[i].ravel()
f = self.eval_points(np.vstack(grd), **kwds)
return f.reshape(shape0)
def _moment_fun(self, r):
if r == 0:
return lambda x: 1
return lambda x: (x ** r).sum(axis=0)
@property
def norm_factor(self):
return self._norm_factor * self.kernel.norm_factor(self.d, self.n)
def _loop_over_data(self, data, points, y, r):
fun = self._moment_fun(r)
d, m = points.shape
inv_hs, lambda_ = self._inv_hs, self._lambda
kernel = self.kernel
y_d_lambda = y / lambda_ ** d
result = np.zeros((m,))
for i in range(self.n):
dxi = points - data[:, i, np.newaxis]
tdiff = np.dot(inv_hs / lambda_[i], dxi)
result += fun(dxi) * kernel(tdiff) * y_d_lambda[i]
return result / self.norm_factor
def _loop_over_points(self, data, points, y, r):
fun = self._moment_fun(r)
d, m = points.shape
inv_hs, lambda_ = self._inv_hs, self._lambda
kernel = self.kernel
y_d_lambda = y / lambda_ ** d
result = np.zeros((m,))
for i in range(m):
dxi = points[:, i, np.newaxis] - data
tdiff = np.dot(inv_hs, dxi / lambda_[np.newaxis, :])
result[i] = np.sum(fun(dxi) * kernel(tdiff) * y_d_lambda, axis=-1)
return result / self.norm_factor
def _eval_points(self, points, **kwds):
"""Evaluate the estimated pdf on a set of points.
Parameters
----------
points : (# of dimensions, # of points)-array
Alternatively, a (# of dimensions,) vector can be passed in and
treated as a single point.
Returns
-------
values : (# of points,)-array
The values at each point.
Raises
------
ValueError if the dimensionality of the input points is different than
the dimensionality of the KDE.
"""
d, m = points.shape
_assert(d == self.d, "d={} expected, got {}".format(self.d, d))
y = kwds.get('y', 1)
r = kwds.get('r', 0)
more_points_than_data = m >= self.n
if more_points_than_data:
return self._loop_over_data(self.dataset, points, y, r)
return self._loop_over_points(self.dataset, points, y, r)
class KRegression(object):
""" Kernel-Regression
Parameters
----------
data : (# of dims, # of data)-array
datapoints to estimate from
y : # of data - array
response variable
p : scalar integer (0 or 1)
Nadaraya-Watson estimator if p=0,
local linear estimator if p=1.
hs : array-like (optional)
smooting parameter vector/matrix.
(default compute from data using kernel.get_smoothing function)
kernel : kernel function object.
kernel must have get_smoothing method
alpha : real scalar (optional)
sensitivity parameter (default 0 regular KDE)
A good choice might be alpha = 0.5 ( or 1/D)
alpha = 0 Regular KDE (hs is constant)
0 < alpha <= 1 Adaptive KDE (Make hs change)
xmin, xmax : vectors
specifying the default argument range for the kde.eval_grid methods.
For the kde.eval_grid_fast methods the values must cover the range of
the data. (default min(data)-range(data)/4, max(data)-range(data)/4)
If a single value of xmin or xmax is given then the boundary is the is
the same for all dimensions.
inc : scalar integer (default 128)
defining the default dimension of the output from kde.eval_grid methods
(For kde.eval_grid_fast: A value below 50 is very fast to compute but
may give some inaccuracies. Values between 100 and 500 give very
accurate results)
Members
-------
d : int
number of dimensions
n : int
number of datapoints
Methods
-------
kde.eval_grid_fast(x0, x1,..., xd) : array
evaluate the estimated pdf on meshgrid(x0, x1,..., xd)
kde.eval_grid(x0, x1,..., xd) : array
evaluate the estimated pdf on meshgrid(x0, x1,..., xd)
kde.eval_points(points) : array
evaluate the estimated pdf on a provided set of points
kde(x0, x1,..., xd) : array
same as kde.eval_grid(x0, x1,..., xd)
References
----------
M.P. Wand and M.C. Jones (1995)
Kernel smoothing, Eq. 5.4, pp. 119 and appendix D4 pp 189
Examples
--------
>>> import wafo.kdetools as wk
>>> N = 100
>>> x = np.linspace(0, 1, N)
>>> ei = np.random.normal(loc=0, scale=0.075, size=(N,))
>>> ei = np.sqrt(0.075) * np.sin(100*x)
>>> y = 2*np.exp(-x**2/(2*0.3**2))+3*np.exp(-(x-1)**2/(2*0.7**2)) + ei
>>> kreg = wk.KRegression(x, y)
>>> f = kreg(output='plotobj', title='Kernel regression', plotflag=1)
>>> np.allclose(f.data[:5],
... [ 3.18670593, 3.18678088, 3.18682196, 3.18682932, 3.18680337])
True
h = f.plot(label='p=0')
"""
def __init__(self, data, y, p=0, hs=None, kernel=None, alpha=0.0,
xmin=None, xmax=None, inc=128, L2=None):
self.tkde = TKDE(data, hs=hs, kernel=kernel,
alpha=alpha, xmin=xmin, xmax=xmax, inc=inc, L2=L2)
self.y = np.atleast_1d(y)
self.p = p
self._grdfun = None
def eval_grid_fast(self, *args, **kwds):
self._grdfun = self.tkde.eval_grid_fast
return self.tkde.eval_grid_fun(self._eval_gridfun, *args, **kwds)
def eval_grid(self, *args, **kwds):
self._grdfun = self.tkde.eval_grid
return self.tkde.eval_grid_fun(self._eval_gridfun, *args, **kwds)
def _eval_gridfun(self, *args, **kwds):
"""
References
----------
M.P. Wand and M.C. Jones (1995)
Kernel smoothing, Eq. 5.4, pp. 119 and appendix D4 pp 189
"""
grdfun = self._grdfun
s0 = grdfun(*args, r=0)
t0 = grdfun(*args, r=0, y=self.y) # TODO: Check accuracy. Culprit fft/ifft?
if self.p == 0:
return (t0 / (s0 + _TINY)).clip(min=-_REALMAX, max=_REALMAX)
elif self.p == 1:
s1 = grdfun(*args, r=1)
s2 = grdfun(*args, r=2)
t1 = grdfun(*args, r=1, y=self.y)
return ((s2 * t0 - s1 * t1) /
(s2 * s0 - s1 ** 2)).clip(min=-_REALMAX, max=_REALMAX)
__call__ = eval_grid_fast
class BKRegression(object):
'''
Kernel-Regression on binomial data
method : {'beta', 'wilson'}
method is one of the following
'beta', return Bayesian Credible interval using beta-distribution.
'wilson', return Wilson score interval
a, b : scalars
parameters of the beta distribution defining the apriori distribution
of p, i.e., the Bayes estimator for p: p = (y+a)/(n+a+b).
Setting a=b=0.5 gives Jeffreys interval.
'''
def __init__(self, data, y, method='beta', a=0.05, b=0.05, p=0, hs_e=None,
hs=None, kernel=None, alpha=0.0, xmin=None, xmax=None,
inc=128, L2=None):
self.method = method
self.a = max(a, _TINY)
self.b = max(b, _TINY)
self.kreg = KRegression(data, y, p=p, hs=hs, kernel=kernel,
alpha=alpha, xmin=xmin, xmax=xmax, inc=inc,
L2=L2)
# defines bin width (i.e. smoothing) in empirical estimate
self.hs_e = hs_e
@property
def hs_e(self):
return self._hs_e
@hs_e.setter
def hs_e(self, hs_e):
if hs_e is None:
hs1 = self._get_max_smoothing('hste')[0]
hs2 = self._get_max_smoothing('hos')[0]
hs_e = sqrt(hs1 * hs2)
# pylint: disable=attribute-defined-outside-init
self._hs_e = hs_e
def _set_smoothing(self, hs):
self.kreg.tkde.hs = hs
x = property(fget=lambda cls: cls.kreg.tkde.dataset.squeeze())
y = property(fget=lambda cls: cls.kreg.y)
kernel = property(fget=lambda cls: cls.kreg.tkde.kernel)
hs = property(fset=_set_smoothing, fget=lambda cls: cls.kreg.tkde.hs)
def _get_max_smoothing(self, fun=None):
"""Return maximum value for smoothing parameter."""
x = self.x
y = self.y
if fun is None:
get_smoothing = self.kernel.get_smoothing
else:
get_smoothing = getattr(self.kernel, fun)
hs1 = get_smoothing(x)
# hx = np.median(np.abs(x-np.median(x)))/0.6745*(4.0/(3*n))**0.2
if (y == 1).any():
hs2 = get_smoothing(x[y == 1])
# hy = np.median(np.abs(y-np.mean(y)))/0.6745*(4.0/(3*n))**0.2
else:
hs2 = 4 * hs1
# hy = 4*hx
hopt = sqrt(hs1 * hs2)
return hopt, hs1, hs2
def get_grid(self, hs_e=None):
if hs_e is None:
hs_e = self.hs_e
x = self.x
xmin, xmax = x.min(), x.max()
ni = max(2 * int((xmax - xmin) / hs_e) + 3, 5)
sml = hs_e # *0.1
xi = np.linspace(xmin - sml, xmax + sml, ni).T
return xi
def _wilson_score(self, n, p, alpha):
"""The Wilson score interval is an improvement over the normal approximation
interval in that the actual coverage probability is closer to the nominal value
for the Binomial proportion confidence interval.
See https://en.wikipedia.org/wiki/Binomial_proportion_confidence_interval
"""
z0 = -_invnorm(alpha / 2)
# xc = (p + 0.5 * z0 ** 2. / n) / (1+z0 ** 2. / n)
# halfwidth = (z0 * sqrt((p * (1 - p) / n) + (z0 ** 2 / (4 * (n ** 2))))) / (1+z0 ** 2. / n)
# or alteratively the numerically more stable solution:
# xc = (p*n/z0**2 + 0.5 ) / (1+n/z0 ** 2.)
# halfwidth = sqrt(n/z0**2*p * (1 - p) + 0.25) / (1+n/z0 ** 2.)
nz02 = n / z0 ** 2.
xc = (p*nz02 + 0.5 ) / (1+nz02)
halfwidth = sqrt(nz02 * p * (1 - p) + 0.25) / (1+nz02)
plo = xc - halfwidth.clip(max=1.0)
pup = xc + halfwidth.clip(max=1.0) # wilson score
return plo, pup
def _credible_interval(self, n, p, alpha):
"""
Bayesian credible interval.
In particular if a=b=0.5 it returns the Jeffreys intervall.
The Jeffreys interval is the Bayesian credible interval obtained when
using the non-informative Jeffreys prior for the binomial proportion p.
The Jeffreys prior for this problem is a Beta distribution with
parameters (1/2, 1/2), it is a conjugate prior.
Here the prior is
beta.isf(alpha/2, y+a, n-y+b)
where y = n*p and n-y = n*(1-p)
"""
a, b = self.a, self.b
pup = np.where(p == 1, 1,
st.beta.isf(alpha / 2, n * p + a, n * (1 - p) + b))
plo = np.where(p == 0, 0,
st.beta.isf(1 - alpha / 2, n * p + a, n * (1 - p) + b))
return plo, pup
def prb_ci(self, n, p, alpha=0.05):
"""Return Confidence Interval for the binomial probability p.
Parameters
----------
n : array-like
number of Bernoulli trials
p : array-like
estimated probability of success in each trial
alpha : scalar
confidence level
"""
if self.method.startswith('w'):
return self._wilson_score(n, p, alpha)
return self._credible_interval(n, p, alpha)
def prb_empirical(self, xi=None, hs_e=None, alpha=0.05, color='r', **kwds):
"""Returns empirical binomial probabiltity.
Parameters
----------
x : ndarray
position vector
y : ndarray
binomial response variable (zeros and ones)
alpha : scalar
confidence level
color:
used in plot
Returns
-------
P(x) : PlotData object
empirical probability
"""
if xi is None:
xi = self.get_grid(hs_e)
x = self.x
y = self.y
c = gridcount(x, xi) # + self.a + self.b # count data
if np.any(y == 1):
c0 = gridcount(x[y == 1], xi) # + self.a # count success
else:
c0 = np.zeros(np.shape(xi))
prb = np.where(c == 0, 0, c0 / (c + _TINY)) # assume prb==0 for c==0
CI = np.vstack(self.prb_ci(c, prb, alpha))
prb_e = PlotData(prb, xi, plotmethod='plot', plot_args=['.'],
plot_kwds=dict(markersize=6, color=color, picker=5))
prb_e.dataCI = CI.T
prb_e.count = c
return prb_e
def prb_smoothed(self, prb_e, hs, alpha=0.05, color='r', label=''):
"""Return smoothed binomial probability.
Parameters
----------
prb_e : PlotData object with empirical binomial probabilites
hs : smoothing parameter
alpha : confidence level
color : color of plot object
label : label for plot object
"""
x_e = prb_e.args
n_e = len(x_e)
dx_e = x_e[1] - x_e[0]
n = self.x.size
x_s = np.linspace(x_e[0], x_e[-1], 10 * n_e + 1).ravel()
self.hs = hs
prb_s = self.kreg(x_s, output='plotobj', title='', plot_kwds=dict(
color=color, linewidth=2)) # dict(plotflag=7))
m_nan = np.isnan(prb_s.data)
if m_nan.any(): # assume 0/0 division
prb_s.data[m_nan] = 0.0
prb_s.data = prb_s.data.clip(min=0, max=1.0)
# prb_s.data[np.isnan(prb_s.data)] = 0
# expected number of data in each bin
c_s = self.kreg.tkde.eval_grid_fast(x_s) * dx_e * n
plo, pup = self.prb_ci(c_s, prb_s.data, alpha)
prb_s.dataCI = np.vstack((plo, pup)).T
prb_s.prediction_error_avg = (np.trapz(pup - plo, x_s) /
(x_s[-1] - x_s[0]))
if label:
prb_s.plot_kwds['label'] = label
prb_s.children = [PlotData([plo, pup], x_s,
plotmethod='fill_between',
plot_kwds=dict(alpha=0.2, color=color)),
prb_e]
p_e = prb_e.eval_points(x_s)
p_s = prb_s.data
dp_s = np.sign(np.diff(p_s))
k = (dp_s[:-1] != dp_s[1:]).sum() # numpeaks
logit_pup = _logit(pup)
logit_plo = _logit(plo)
logit_p_e = _logit(p_e)
s1, s2 = 1, 1
sigmai = logit_pup - logit_plo + _EPS
aicc = (linalg.norm((logit_p_e - _logit(p_s)) / sigmai, ord=s1) +
2 * k * (k + 1) / np.maximum(n_e - k + 1, 1) + # numpeaks**2
linalg.norm(((logit_p_e - logit_pup).clip(min=0) -
(logit_p_e - logit_plo).clip(max=0))/sigmai, ord=s2)) # empirical p_e outside of [plo, pup]
prb_s.aicc = aicc
return prb_s
def prb_search_best(self, prb_e=None, hsvec=None, hsfun='hste',
alpha=0.05, color='r', label=''):
"""Return best smoothed binomial probability.
Parameters
----------
prb_e : PlotData object with empirical binomial probabilites
hsvec : arraylike (default np.linspace(hsmax*0.1, hsmax, 55))
vector smoothing parameters
hsfun :
method for calculating hsmax
"""
if prb_e is None:
prb_e = self.prb_empirical(alpha=alpha, color=color)
if hsvec is None:
hsmax = max(self._get_max_smoothing(hsfun)[0], self.hs_e)
hsvec = np.linspace(hsmax * 0.1, hsmax, 55).ravel()
hs_best = hsvec[-1] + 0.1
prb_best = self.prb_smoothed(prb_e, hs_best, alpha, color, label)
aicc = np.zeros(np.size(hsvec))
for i, hi in enumerate(hsvec):
f = self.prb_smoothed(prb_e, hi, alpha, color, label)
aicc[i] = f.aicc
if f.aicc <= prb_best.aicc:
prb_best = f
hs_best = hi
prb_best.score = PlotData(aicc, hsvec)
prb_best.hs = hs_best
self._set_smoothing(hs_best)
return prb_best
if __name__ == '__main__':
test_docstrings(__file__)