kinisi/diffusion.py
"""
The modules is focused on tools for the evaluation of the mean squared displacement and resulting
diffusion coefficient from a material.
"""
# Copyright (c) Andrew R. McCluskey and Benjamin J. Morgan
# Distributed under the terms of the MIT License
# author: Andrew R. McCluskey (arm61)
import warnings
from typing import List, Union, Any
import numpy as np
from scipy.stats import normaltest, linregress, multivariate_normal
from scipy.linalg import pinvh
from scipy.optimize import minimize, curve_fit
import scipy.constants as const
import tqdm
from uravu.distribution import Distribution
from sklearn.utils import resample
from emcee import EnsembleSampler
from statsmodels.stats.correlation_tools import cov_nearest
DIMENSIONALITY = {
'x': np.s_[0],
'y': np.s_[1],
'z': np.s_[2],
'xy': np.s_[:2],
'xz': np.s_[::2],
'yz': np.s_[1:],
'xyz': np.s_[:],
b'x': np.s_[0],
b'y': np.s_[1],
b'z': np.s_[2],
b'xy': np.s_[:2],
b'xz': np.s_[::2],
b'yz': np.s_[1:],
b'xyz': np.s_[:]
}
class Bootstrap:
"""
The top-level class for bootstrapping.
:param delta_t: An array of the timestep values.
:param disp_3d: A list of arrays, where each array has the axes [atom, displacement observation, dimension].
There is one array in the list for each delta_t value. Note: it is necessary to use a list of arrays as
the number of observations is not necessary the same at each data point.
:param sub_sample_dt: The frequency in observations to be sampled. Optional, default is :py:attr:`1` (every
observation).
:param progress: Show tqdm progress for sampling. Optional, default is :py:attr:`True`.
"""
def __init__(self,
delta_t: np.ndarray,
disp_3d: List[np.ndarray],
n_o: np.ndarray,
sub_sample_dt: int = 1,
dimension: str = 'xyz'):
self._displacements = disp_3d[::sub_sample_dt]
self._delta_t = np.array(delta_t[::sub_sample_dt])
self._max_obs = self._displacements[0].shape[1]
self._distributions = []
self._dt = np.array([])
self._n = np.array([])
self._s = np.array([])
self._v = np.array([])
self._n_bootstrap = np.array([])
self._s_bootstrap = np.array([])
self._v_bootstrap = np.array([])
self._n_o = n_o
self._ngp = np.array([])
self._sub_sample_dt = sub_sample_dt
self._dimension = dimension
self._euclidian_displacements = []
self._diffusion_coefficient = None
self._jump_diffusion_coefficient = None
self._sigma = None
self._intercept = None
self.gradient = None
self.flatchain = None
self._covariance_matrix = None
self._slice = DIMENSIONALITY[dimension.lower()]
self._start_dt = None
self.dims = len(dimension.lower())
self._model = None
self._cond_max = None
def to_dict(self) -> dict:
"""
:return: Dictionary description of the :py:class:`Bootstrap`.
"""
my_dict = {
'displacements': self._displacements,
'delta_t': self._delta_t,
'n_o': self._n_o,
'max_obs': self._max_obs,
'dt': self._dt,
'n': self._n,
's': self._s,
'v': self._v,
'n_bootstrap': self._n_bootstrap,
's_bootstrap': self._s_bootstrap,
'v_bootstrap': self._v_bootstrap,
'sub_sample_dt': self._sub_sample_dt,
'dimension': self._dimension,
'ngp': self._ngp,
'covariance_matrix': self._covariance_matrix,
'distributions': None,
'diffusion_coefficient': None,
'jump_diffusion_coefficient': None,
'sigma': None,
'intercept': None,
'gradient': None,
'start_dt': self._start_dt,
'model': self._model,
'cond_max': self._cond_max
}
if len(self._distributions) != 0:
my_dict['distributions'] = [d.to_dict() for d in self._distributions]
my_dict['euclidian_displacements'] = [d.to_dict() for d in self._euclidian_displacements]
if self._diffusion_coefficient is not None:
my_dict['diffusion_coefficient'] = self._diffusion_coefficient.to_dict()
if self._jump_diffusion_coefficient is not None:
my_dict['jump_diffusion_coefficient'] = self._jump_diffusion_coefficient.to_dict()
if self._sigma is not None:
my_dict['sigma'] = self._sigma.to_dict()
if self._intercept is not None:
my_dict['intercept'] = self._intercept.to_dict()
if self.gradient is not None:
my_dict['gradient'] = self.gradient.to_dict()
my_dict['flatchain'] = self.flatchain
return my_dict
@classmethod
def from_dict(cls, my_dict: dict) -> 'Bootstrap':
"""
Generate a :py:class:`Bootstrap` object from a dictionary.
:param my_dict: The input dictionary.
:return: New :py:class`Bootstrap` object.
"""
boot = cls(my_dict['delta_t'],
my_dict['displacements'],
my_dict['n_o'],
sub_sample_dt=my_dict['sub_sample_dt'],
dimension=my_dict['dimension'])
boot._max_obs = my_dict['max_obs']
boot._euclidian_displacements = [Distribution.from_dict(d) for d in my_dict['euclidian_displacements']]
boot._dt = my_dict['dt']
boot._n = my_dict['n']
boot._s = my_dict['s']
boot._v = my_dict['v']
boot._n_bootstrap = my_dict['n_bootstrap']
boot._s_bootstrap = my_dict['s_bootstrap']
boot._v_bootstrap = my_dict['v_bootstrap']
boot._n_o = my_dict['n_o']
boot._ngp = my_dict['ngp']
if my_dict['distributions'] is not None:
boot._distributions = [Distribution.from_dict(d) for d in my_dict['distributions']]
if my_dict['diffusion_coefficient'] is not None:
boot._diffusion_coefficient = Distribution.from_dict(my_dict['diffusion_coefficient'])
if my_dict['jump_diffusion_coefficient'] is not None:
boot._jump_diffusion_coefficient = Distribution.from_dict(my_dict['jump_diffusion_coefficient'])
if my_dict['sigma'] is not None:
boot._sigma = Distribution.from_dict(my_dict['sigma'])
if my_dict['intercept'] is not None:
boot._intercept = Distribution.from_dict(my_dict['intercept'])
if my_dict['gradient'] is not None:
boot.gradient = Distribution.from_dict(my_dict['gradient'])
boot.flatchain = my_dict['flatchain']
boot._covariance_matrix = my_dict['covariance_matrix']
boot._start_dt = my_dict['start_dt']
boot._model = my_dict['model']
boot._cond_max = my_dict['cond_max']
return boot
@property
def dt(self) -> np.ndarray:
"""
:return: Timestep values that were resampled.
"""
return self._dt
@property
def n(self) -> np.ndarray:
"""
:return: The mean MSD/MSTD/MSCD, as determined from the bootstrap resampling process, in units Å:sup:`2`.
"""
return self._n
@property
def s(self) -> np.ndarray:
"""
:return: The MSD/MSTD/MSCD standard deviation, as determined from the bootstrap resampling process, in
units Å:sup:`2`.
"""
return self._s
@property
def v(self) -> np.ndarray:
"""
:return: The MSD/MSTD/MSCD variance as determined from the bootstrap resampling process, in units Å:sup:`4`.
"""
return self._v
@property
def euclidian_displacements(self) -> List[Distribution]:
"""
:return: Displacements between particles at each dt.
"""
return self._euclidian_displacements
@property
def ngp(self) -> np.ndarray:
"""
:return: Non-Gaussian parameter as a function of :py:attr:`dt`.
"""
return self._ngp
@property
def n_i(self) -> np.ndarray:
"""
:return: The number of independent trajectories as a function of :py:attr:`dt`.
"""
return self._n_i
@property
def intercept(self) -> Union[Distribution, None]:
"""
:return: The estimated intercept. Note that if :py:attr:`fit_intercept` is :py:attr:`False` is the
relavent method call, then this is :py:attr:`None`
"""
return self._intercept
@property
def covariance_matrix(self) -> np.ndarray:
"""
:return: The covariance matrix for the trajectories.
"""
return self._covariance_matrix
@staticmethod
def iterator(progress: bool, loop: Union[list, range]) -> Union[tqdm.std.tqdm, range]:
"""
Get the iteration object, using :py:mod:`tqdm` as appropriate.
:param progress: Should :py:mod:`tqdm` be used to give a progress bar.
:param loop: The object that should be looped over.
:return: Iterator object.
"""
if progress:
return tqdm.tqdm(loop, desc='Finding Means and Variances')
return loop
@staticmethod
def sample_until_normal(array: np.ndarray,
n_samples: float,
n_resamples: int,
max_resamples: int,
alpha: float = 1e-3,
random_state: np.random.mtrand.RandomState = None) -> Distribution:
"""
Resample from the distribution until a normal distribution is obtained or a maximum is reached.
Args:
:param array: The array to sample from.
:param n_samples: Number of samples.
:param r_resamples: Number of resamples to perform initially.
:param max_resamples: The maximum number of resamples to perform.
:param alpha: Level that p-value should be below in :py:func:`scipy.stats.normaltest` for the distribution
to be normal. Optional, default is :py:attr:`1e-3`.
:param random_state: A :py:attr:`RandomState` object to be used to ensure reproducibility. Optional,
default is :py:attr:`None`.
:return: The resampled distribution.
"""
# values = _bayesian_bootstrap(array, n_samples, n_resamples, random_state)
values = _bootstrap(array, n_samples, n_resamples, random_state)
p_value = normaltest(values)[1]
while p_value < alpha and len(values) < max_resamples:
# values += _bayesian_bootstrap(array, n_samples, 100, random_state)
values += _bootstrap(array, n_samples, 100, random_state)
p_value = normaltest(values)[1]
if len(values) >= max_resamples:
warnings.warn("The maximum number of resamples has been reached, and the distribution is not yet normal.")
return Distribution(values)
@staticmethod
def ngp_calculation(d_squared: np.ndarray) -> float:
"""
Determine the non-Gaussian parameter, from S. Song et al, "Transport dynamics of complex fluids"
(2019): 10.1073/pnas.1900239116
:param d_squared: Squared displacement values.
:return: Value of non-Gaussian parameter.
"""
top = np.mean(d_squared * d_squared) * 3
bottom = np.square(np.mean(d_squared)) * 5
return top / bottom - 1
def bootstrap_GLS(self,
start_dt: float,
cond_max: float = 1e16,
model: bool = False,
fit_intercept: bool = True,
n_samples: int = 1000,
n_walkers: int = 32,
n_burn: int = 500,
thin: int = 10,
progress: bool = True,
random_state: np.random.mtrand.RandomState = None):
"""
Use the covariance matrix estimated from the resampled values to estimate the gradient and intercept
using a generalised least squares approach.
:param start_dt: The starting time for the analysis to find the diffusion coefficient.
This should be the start of the diffusive regime in the simulation.
:param cond_max: The maximum condition number of the covariance matrix, in the event that the diffusion
coefficient appears very wrong (from plotting), this should be increased until it looks reasonable.
For more information, about the poor condition of estimated covariance matricies, have a look
at doi:10.1080/16000870.2019.1696646.
:param model: Use the model for the covariance matrix, if :py:attr:`True` a model for a random walk
will be used to generate that covariance matrix. Optional, default is :py:attr:`True`.
:param fit_intercept: Should the intercept of the diffusion relationship be fit. Optional, default
is :py:attr:`True`.
:param n_samples: Number of samples of the Gaussian process to perform. Optional, default is :py:attr:`1000`.
:param n_walkers: Number of MCMC walkers to use. Optional, default is :py:attr:`32`.
:param n_burn: Number of burn in samples (these allow the sampling to settle). Optional, default
is :py:attr:`500`.
:param thin: Use only every :py:attr:`thin` samples for the MCMC sampler. Optional, default is :py:attr:`10`.
:param progress: Show tqdm progress for sampling. Optional, default is :py:attr:`True`.
:param random_state: A :py:attr:`RandomState` object to be used to ensure reproducibility. Optional,
default is :py:attr:`None`.
"""
if random_state is not None:
np.random.seed(random_state.get_state()[1][1])
self._start_dt = start_dt
self._model = model
self._cond_max = cond_max
diff_regime = np.argwhere(self._dt >= self._start_dt)[0][0]
self._covariance_matrix = self.generate_covariance_matrix(diff_regime)
_, logdet = np.linalg.slogdet(self._covariance_matrix)
logdet += np.log(2 * np.pi) * self._n[diff_regime:].size
inv = pinvh(self._covariance_matrix)
def log_likelihood(theta: np.ndarray) -> float:
"""
Get the log likelihood for multivariate normal distribution.
:param theta: Value of the gradient and intercept of the straight line.
:return: Log-likelihood value.
"""
if theta[0] < 0:
return -np.inf
model = _straight_line(self._dt[diff_regime:], *theta)
diff = (model - self._n[diff_regime:])
logl = -0.5 * (logdet + np.matmul(diff.T, np.matmul(inv, diff)))
return logl
ols = linregress(self._dt[diff_regime:], self._n[diff_regime:])
slope = ols.slope
intercept = 1e-20
if slope < 0:
slope = 1e-20
def nll(*args) -> float:
"""
General purpose negative log-likelihood.
:return: Negative log-likelihood
"""
return -log_likelihood(*args)
if fit_intercept:
max_likelihood = minimize(nll, np.array([slope, intercept])).x
else:
max_likelihood = minimize(nll, np.array([slope])).x
pos = max_likelihood + max_likelihood * 1e-3 * np.random.randn(n_walkers, max_likelihood.size)
sampler = EnsembleSampler(*pos.shape, log_likelihood)
# Waiting on https://github.com/dfm/emcee/pull/376
# if random_state is not None:
# pos = max_likelihood + max_likelihood * 1e-3 * random_state.randn(n_walkers, max_likelihood.size)
# sampler._random = random_state
sampler.run_mcmc(pos, n_samples + n_burn, progress=progress, progress_kwargs={'desc': "Likelihood Sampling"})
self.flatchain = sampler.get_chain(flat=True, thin=thin, discard=n_burn)
self.gradient = Distribution(self.flatchain[:, 0])
self._intercept = None
if fit_intercept:
self._intercept = Distribution(self.flatchain[:, 1])
def generate_covariance_matrix(self, diff_regime: int):
"""
Generate the covariance matrix, including the modelling and finding the closest matrix
that is positive definite.
:param diff_regime: The index of the point where the analysis should begin.
:return: Modelled covariance matrix that is positive definite.
"""
def _model_variance(dt: np.ndarray, a: float) -> np.ndarray:
"""
Determine the model variance, based on a quadratic relationship with the number of
independent samples as a divisor.
:param dt: Timestep value
:param a: Quadratic coefficient
:param b: Offset value
:return: Model variances
"""
return a / self._n_o[diff_regime:] * dt**2
if self._model:
self._popt, _ = curve_fit(_model_variance, self.dt[diff_regime:], self._v[diff_regime:], bounds=(0, np.inf))
self._model_v = _model_variance(self.dt[diff_regime:], *self._popt)
else:
self._model_v = self._v[diff_regime:]
self._covariance_matrix = _populate_covariance_matrix(self._model_v, self._n_o[diff_regime:])
self._npd_covariance_matrix = self._covariance_matrix
return cov_nearest(minimum_eigenvalue_method(self._covariance_matrix, self._cond_max))
def diffusion(self, start_dt: float, **kwargs):
"""
Use the bootstrap-GLS method to determine the diffusivity for the system. Keyword arguments will be
passed of the :py:func:`bootstrap_GLS` method.
:param start_dt: The starting time for the analysis to find the diffusion coefficient.
This should be the start of the diffusive regime in the simulation.
"""
self.bootstrap_GLS(start_dt, **kwargs)
self._diffusion_coefficient = Distribution(self.gradient.samples / (2e4 * self.dims))
@property
def D(self) -> Union[Distribution, None]:
"""
An alias for the diffusion coefficient Distribution.
:return: Diffusion coefficient, with units of cm:sup:`2`s:sup:`-1`.
"""
return self._diffusion_coefficient
def jump_diffusion(self, start_dt: float, **kwargs):
"""
Use the bootstrap-GLS method to determine the jump diffusivity for the system. Keyword arguments
will be passed of the :py:func:`bootstrap_GLS` method.
:param start_dt: The starting time for the analysis to find the diffusion coefficient.
This should be the start of the diffusive regime in the simulation.
"""
self.bootstrap_GLS(start_dt, **kwargs)
self._jump_diffusion_coefficient = Distribution(self.gradient.samples /
(2e4 * self.dims * self._displacements[0].shape[0]))
@property
def D_J(self) -> Union[Distribution, None]:
"""
Alias for the jump diffusion coefficient Distribution.
:return: Jump diffusion coefficient, with units of cm:sup:`2`s:sup:`-1`.
"""
return self._jump_diffusion_coefficient
def conductivity(self, start_dt: float, temperature: float, volume: float, **kwargs):
"""
Use the bootstrap-GLS method to determine the ionic conductivity for the system, in units of mScm:sup:`-1`.
Keyword arguments will be passed of the :py:func:`bootstrap_GLS` method.
:param start_dt: The starting time for the analysis to find the diffusion coefficient.
This should be the start of the diffusive regime in the simulation.
:param temperature: System temperature, in Kelvin.
:param volume: System volume, in Å^{3}.
"""
self.bootstrap_GLS(start_dt, **kwargs)
volume = volume * 1e-24 # cm^3
D = self.gradient.samples / (2e4 * self.dims) # cm^2s^-1
conversion = 1000 / (volume * const.N_A) * (const.N_A * const.e)**2 / (const.R * temperature)
self._sigma = Distribution(D * conversion)
@property
def sigma(self) -> Union[Distribution, None]:
"""
:return: The estimated conductivity, based on the generalised least squares approach, with
units mScm:sup:`-1`.
"""
return self._sigma
def posterior_predictive(self,
n_posterior_samples: int = None,
n_predictive_samples: int = 256,
progress: bool = True) -> np.ndarray:
"""
Sample the posterior predictive distribution. The shape of the resulting array will be
`(n_posterior_samples * n_predictive_samples, start_dt)`.
:param n_posterior_samples: Number of samples from the posterior distribution.
Optional, default is the number of posterior samples.
:param n_predictive_samples: Number of random samples per sample from the posterior distribution.
Optional, default is :py:attr:`256`.
:param progress: Show tqdm progress for sampling. Optional, default is :py:attr:`True`.
:return: Samples from the posterior predictive distribution.
"""
if n_posterior_samples is None:
n_posterior_samples = self.gradient.size
diff_regime = np.argwhere(self._dt >= self._start_dt)[0][0]
ppd = np.zeros((n_posterior_samples, n_predictive_samples, self._dt[diff_regime:].size))
samples_to_draw = list(enumerate(np.random.randint(0, self.gradient.size, size=n_posterior_samples)))
if progress:
iterator = tqdm.tqdm(samples_to_draw, desc='Calculating Posterior Predictive')
else:
iterator = samples_to_draw
for i, n in iterator:
mu = self.gradient.samples[n] * self._dt[diff_regime:] + self.intercept.samples[n]
mv = multivariate_normal(mean=mu, cov=self._covariance_matrix, allow_singular=True)
ppd[i] = mv.rvs(n_predictive_samples)
return ppd.reshape(-1, self._dt[diff_regime:].size)
class MSDBootstrap(Bootstrap):
"""
Perform a bootstrap resampling to obtain accurate estimates for the mean and uncertainty for the mean
squared displacements.
:param delta_t: An array of the timestep values, units of ps
:param disp_3d: A list of arrays, where each array has the axes
:code:`[atom, displacement observation, dimension]`. There is one array in the list for each
delta_t value. Note: it is necessary to use a list of arrays as the number of observations is
not necessary the same at each data point.
:param n_o: Number of statistically independent observations of the MSD at each timestep.
:param sub_sample_dt: The frequency in observations to be sampled. Default is :py:attr:`1` (every observation)
:param bootstrap: Should bootstrap resampling be used to estimate the observed MSD distribution.
Optional, default is :py:attr:`False`.
:param block: Should the blocking method be used to estimate the variance, if :py:attr:`False` an
approximation is used to estimate the variance. Optional, default is :py:attr:`False`.
:param n_resamples: The initial number of resamples to be performed. Default is :py:attr:`1000`
:param max_resamples: The max number of resamples to be performed by the distribution is assumed to be normal.
This is present to allow user control over the time taken for the resampling to occur. Default
is :py:attr:`100000`
:param dimension: Dimension/s to find the displacement along, this should be some subset of `'xyz'` indicating
the axes of interest. Optional, defaults to `'xyz'`.
:param alpha: Value that p-value for the normal test must be greater than to accept. Default is :py:attr:`1e-3`
:param random_state : A :py:attr:`RandomState` object to be used to ensure reproducibility. Default
is :py:attr:`None`
:param progress: Show tqdm progress for sampling. Default is :py:attr:`True`
"""
def __init__(self,
delta_t: np.ndarray,
disp_3d: List[np.ndarray],
n_o: np.ndarray,
sub_sample_dt: int = 1,
bootstrap: bool = False,
block: bool = False,
n_resamples: int = 1000,
max_resamples: int = 10000,
dimension: str = 'xyz',
alpha: float = 1e-3,
random_state: np.random.mtrand.RandomState = None,
progress: bool = True):
super().__init__(delta_t, disp_3d, n_o, sub_sample_dt, dimension)
self._iterator = self.iterator(progress, range(len(self._displacements)))
if block:
import pyblock
print('You are using the blocking method to estimate variances, please cite '
'doi:10.1063/1.457480 and the pyblock pacakge.')
for i in self._iterator:
disp_slice = self._displacements[i][:, :, self._slice].reshape(self._displacements[i].shape[0],
self._displacements[i].shape[1], self.dims)
d_squared = np.sum(disp_slice**2, axis=-1)
if d_squared.size <= 1:
continue
self._euclidian_displacements.append(Distribution(np.sqrt(d_squared.flatten())))
if bootstrap:
distro = self.sample_until_normal(d_squared, n_o[i], n_resamples, max_resamples, alpha, random_state)
self._distributions.append(distro)
self._n_bootstrap = np.append(self._n_bootstrap, np.mean(distro.samples))
self._v_bootstrap = np.append(self._v_bootstrap, np.var(distro.samples, ddof=1))
self._s_bootstrap = np.append(self._s_bootstrap, np.std(distro.samples, ddof=1))
if block:
reblock = pyblock.blocking.reblock(d_squared.flatten())
opt_block = pyblock.blocking.find_optimal_block(d_squared.flatten().size, reblock)
try:
mean = reblock[opt_block[0]].mean
var = reblock[opt_block[0]].std_err**2
except TypeError:
mean = reblock[-1].mean
var = reblock[-1].std_err**2
self._n = np.append(self._n, mean)
self._v = np.append(self._v, var)
self._s = np.append(self._s, np.sqrt(self._v[i]))
else:
self._n = np.append(self._n, d_squared.mean())
self._v = np.append(self._v, np.var(d_squared, ddof=1) / n_o[i])
self._s = np.append(self._s, np.sqrt(self._v[i]))
self._ngp = np.append(self._ngp, self.ngp_calculation(d_squared))
self._dt = np.append(self._dt, self._delta_t[i])
self._n_o = self._n_o[:self._n.size]
class MSTDBootstrap(Bootstrap):
"""
Perform a bootstrap resampling to obtain accurate estimates for the mean and uncertainty for the total
mean squared displacements.
:param delta_t: An array of the timestep values.
:param disp_3d: A list of arrays, where each array has the axes
:code:`[atom, displacement observation, dimension]`. There is one array in the list for each
delta_t value. Note: it is necessary to use a list of arrays as the number of observations is
not necessary the same at each data point.
:param n_o: Number of statistically independent observations of the MSD at each timestep.
:param sub_sample_dt: The frequency in observations to be sampled. Optional, default
is :py:attr:`1` (every observation).
:param bootstrap: Should bootstrap resampling be used to estimate the observed MSD distribution.
Optional, default is :py:attr:`False`.
:param block: Should the blocking method be used to estimate the variance, if :py:attr:`False` an
approximation is used to estimate the variance. Optional, default is :py:attr:`False`.
:param n_resamples: The initial number of resamples to be performed. Optional, default
is :py:attr:`1000`
:param max_resamples: The max number of resamples to be performed by the distribution is assumed to be
normal. This is present to allow user control over the time taken for the resampling to occur.
Optional, default is :py:attr:`100000`
:param dimension: Dimension/s to find the displacement along, this should be some subset of `'xyz'` indicating
the axes of interest. Optional, defaults to `'xyz'`.
:param alpha: Value that p-value for the normal test must be greater than to accept. Optional, default
is :py:attr:`1e-3`
:param random_state : A :py:attr:`RandomState` object to be used to ensure reproducibility. Optional,
default is :py:attr:`None`
:param progress: Show tqdm progress for sampling. Optional, default is :py:attr:`True`
"""
def __init__(self,
delta_t: np.ndarray,
disp_3d: List[np.ndarray],
n_o: np.ndarray,
sub_sample_dt: int = 1,
bootstrap: bool = False,
block: bool = False,
n_resamples: int = 1000,
max_resamples: int = 10000,
dimension: str = 'xyz',
alpha: float = 1e-3,
random_state: np.random.mtrand.RandomState = None,
progress: bool = True):
super().__init__(delta_t, disp_3d, n_o, sub_sample_dt, dimension)
self._iterator = self.iterator(progress, range(int(len(self._displacements) / 2)))
if block:
import pyblock
print('You are using the blocking method to estimate variances, please cite '
'doi:10.1063/1.457480 and the pyblock pacakge.')
for i in self._iterator:
disp_slice = self._displacements[i][:, :, self._slice].reshape(self._displacements[i].shape[0],
self._displacements[i].shape[1], self.dims)
d_squared = np.sum(disp_slice**2, axis=-1)
coll_motion = np.sum(np.sum(disp_slice, axis=0)**2, axis=-1)
if coll_motion.size <= 1:
continue
self._euclidian_displacements.append(Distribution(np.sqrt(d_squared.flatten())))
if bootstrap:
distro = self.sample_until_normal(coll_motion, n_o[i] / d_squared.shape[0], n_resamples, max_resamples,
alpha, random_state)
self._distributions.append(distro)
self._n_bootstrap = np.append(self._n_bootstrap, np.mean(distro.samples))
self._v_bootstrap = np.append(self._v_bootstrap, np.var(distro.samples, ddof=1))
self._s_bootstrap = np.append(self._s_bootstrap, np.std(distro.samples, ddof=1))
if block:
reblock = pyblock.blocking.reblock(coll_motion.flatten())
opt_block = pyblock.blocking.find_optimal_block(coll_motion.flatten().size, reblock)
try:
mean = reblock[opt_block[0]].mean
var = reblock[opt_block[0]].std_err**2
except TypeError:
mean = reblock[-1].mean
var = reblock[-1].std_err**2
self._n = np.append(self._n, mean)
self._v = np.append(self._v, var)
self._s = np.append(self._s, np.sqrt(self._v[i]))
else:
self._n = np.append(self._n, coll_motion.mean())
self._v = np.append(self._v, np.var(coll_motion, ddof=1) / (n_o[i] / d_squared.shape[0]))
self._s = np.append(self._s, np.sqrt(self._v[i]))
self._ngp = np.append(self._ngp, self.ngp_calculation(d_squared.flatten()))
self._dt = np.append(self._dt, self._delta_t[i])
self._n_o = self._n_o[:self._n.size]
class MSCDBootstrap(Bootstrap):
"""
Perform a bootstrap resampling to obtain accurate estimates for the mean and uncertainty for the mean
squared charge displacements.
:param delta_t: An array of the timestep values.
:param disp_3d: A list of arrays, where each array has the axes
:code:`[atom, displacement observation, dimension]`. There is one array in the list for each
delta_t value. Note: it is necessary to use a list of arrays as the number of observations is
not necessary the same at each data point.
:param ionic_charge: The charge on the mobile ions, either an array with a value for each ion or a scalar
if all values are the same.
:param n_o: Number of statistically independent observations of the MSD at each timestep.
:param bootstrap: Should bootstrap resampling be used to estimate the observed MSD distribution.
Optional, default is :py:attr:`False`.
:param block: Should the blocking method be used to estimate the variance, if :py:attr:`False` an
approximation is used to estimate the variance. Optional, default is :py:attr:`False`.
:param sub_sample_dt: The frequency in observations to be sampled. Optional, default is :py:attr:`1`
(every observation).
:param n_resamples: The initial number of resamples to be performed. Optional, default is :py:attr:`1000`.
:param max_resamples: The max number of resamples to be performed by the distribution is assumed to be normal.
This is present to allow user control over the time taken for the resampling to occur. Optional, default
is :py:attr:`100000`.
:param dimension: Dimension/s to find the displacement along, this should be some subset of `'xyz'` indicating
the axes of interest. Optional, defaults to `'xyz'`.
:param alpha: Value that p-value for the normal test must be greater than to accept. Optional, default
is :py:attr:`1e-3`.
:param random_state: A :py:attr:`RandomState` object to be used to ensure reproducibility. Optional,
default is :py:attr:`None`.
:param progress: Show tqdm progress for sampling. Optional, default is :py:attr:`True`.
"""
def __init__(self,
delta_t: np.ndarray,
disp_3d: List[np.ndarray],
ionic_charge: Union[np.ndarray, int],
n_o: np.ndarray,
sub_sample_dt: int = 1,
bootstrap: bool = False,
block: bool = False,
n_resamples: int = 1000,
max_resamples: int = 10000,
dimension: str = 'xyz',
alpha: float = 1e-3,
random_state: np.random.mtrand.RandomState = None,
progress: bool = True):
super().__init__(delta_t, disp_3d, n_o, sub_sample_dt, dimension)
self._iterator = self.iterator(progress, range(int(len(self._displacements) / 2)))
try:
_ = len(ionic_charge)
except TypeError:
ionic_charge = np.ones(self._displacements[0].shape[0]) * ionic_charge
if block:
import pyblock
print('You are using the blocking method to estimate variances, please cite '
'doi:10.1063/1.457480 and the pyblock pacakge.')
for i in self._iterator:
disp_slice = self._displacements[i][:, :, self._slice].reshape(self._displacements[i].shape[0],
self._displacements[i].shape[1], self.dims)
d_squared = np.sum(disp_slice**2, axis=-1)
sq_chg_motion = np.sum(np.sum((ionic_charge * self._displacements[i].T).T, axis=0)**2, axis=-1)
if sq_chg_motion.size <= 1:
continue
self._euclidian_displacements.append(Distribution(np.sqrt(d_squared.flatten())))
if bootstrap:
distro = self.sample_until_normal(sq_chg_motion, n_o[i] / d_squared.shape[0], n_resamples,
max_resamples, alpha, random_state)
self._distributions.append(distro)
self._n_bootstrap = np.append(self._n_bootstrap, np.mean(distro.samples))
self._v_bootstrap = np.append(self._v_bootstrap, np.var(distro.samples, ddof=1))
self._s_bootstrap = np.append(self._s_bootstrap, np.std(distro.samples, ddof=1))
if block:
reblock = pyblock.blocking.reblock(sq_chg_motion.flatten())
opt_block = pyblock.blocking.find_optimal_block(sq_chg_motion.flatten().size, reblock)
try:
mean = reblock[opt_block[0]].mean
var = reblock[opt_block[0]].std_err**2
except TypeError:
mean = reblock[-1].mean
var = reblock[-1].std_err**2
self._n = np.append(self._n, mean)
self._v = np.append(self._v, var)
self._s = np.append(self._s, np.sqrt(self._v[i]))
else:
self._n = np.append(self._n, sq_chg_motion.mean())
self._v = np.append(self._v, np.var(sq_chg_motion, ddof=1) / (n_o[i] / d_squared.shape[0]))
self._s = np.append(self._s, np.sqrt(self._v[i]))
self._ngp = np.append(self._ngp, self.ngp_calculation(d_squared.flatten()))
self._dt = np.append(self._dt, self._delta_t[i])
self._n_o = self._n_o[:self._n.size]
def _bootstrap(array: np.ndarray,
n_samples: int,
n_resamples: float,
random_state: np.random.mtrand.RandomState = None) -> List[float]:
"""
Perform a set of resamples.
:param array: The array to sample from.
:param n_samples: Number of samples.
:param n_resamples: Number of resamples to perform.
:param random_state: A :py:attr:`RandomState` object to be used to ensure reproducibility. Optional,
default is :py:attr:`None`
:return: Simulated means from resampling the array.
"""
return [
np.mean(resample(array.flatten(), n_samples=int(n_samples), random_state=random_state).flatten())
for j in range(n_resamples)
]
def _bayesian_bootstrap(array: np.ndarray,
n_samples: float,
n_resamples: int,
random_state: np.random.mtrand.RandomState = None) -> List[float]:
"""
Performs a Bayesian bootstrap simulation of the posterior distribution of the mean of observed values,
using a sparse Dirichlet prior for sample weights.
The sparsity of the Dirichlet prior for the sample weights is controlled by a concentration parameter
alpha, where alpha = k(N-1)/(k-1). k is the dimensionality of the array of observed values, and
N can be considered an effective number of samples for each set of sample weights.
alpha has been chosen to vary linearly with N, and gives a flat Dirichlet prior when N=k,
and a uniform categorical prior when N=1.
:param array: The array to sample from.
:param n_samples: The effective number of samples for each set of simulated weights.
:param n_resamples: Number of resamples to perform.
:param random_state: A :py:attr:`RandomState` object. Optional, default is :py:attr:`None`
:return: Samples from the simulated posterior distribution of the mean of the array.
"""
if random_state == None:
random_state = np.random.mtrand.RandomState()
values = array.flatten()
k = len(values)
alphak = (n_samples - 1) / (k - 1)
if alphak > 0:
weights = random_state.dirichlet(alpha=np.ones(k) * alphak, size=n_resamples)
else:
# Sample from a uniform categorical distribution, equivalent to Dirichlet([0,0,0,…])
weights = random_state.multinomial(n=1, pvals=np.ones(k) / k, size=n_resamples)
return list(np.sum(weights * values, axis=1))
def _populate_covariance_matrix(variances: np.ndarray, n_samples: np.ndarray) -> np.ndarray:
"""
Populate the covariance matrix for the generalised least squares methodology.
:param variances: The variances for each timestep
:param n_samples: Number of independent trajectories for each timestep
:return: An estimated covariance matrix for the system
"""
covariance_matrix = np.zeros((variances.size, variances.size))
for i in range(0, variances.size):
for j in range(i, variances.size):
ratio = n_samples[i] / n_samples[j]
value = ratio * variances[i]
covariance_matrix[i, j] = value
covariance_matrix[j, i] = np.copy(covariance_matrix[i, j])
return covariance_matrix
def _straight_line(abscissa: np.ndarray, gradient: float, intercept: float = 0.0) -> np.ndarray:
"""
A one dimensional straight line function.
:param abscissa: The abscissa data.
:param gradient: The slope of the line.
:param intercept: The y-intercept of the line. Optional, default is :py:attr:`0.0`.
:return: The resulting ordinate.
"""
return gradient * abscissa + intercept
def minimum_eigenvalue_method(matrix: np.ndarray, cond_max=1e16) -> np.ndarray:
"""
Implementation of the matrix reconditioning method known as the minimum
eigenvalue method, as outlined in doi:10.1080/16000870.2019.1696646. This
should produce a matrix with a condition number of :py:attr:`cond_max` based
on the eigenvalues and eigenvectors of the input matrix.
:param matrix: Matrix to recondition.
:param cond_max: Expected condition number of output matrix. Optional,
default is :py:attr:`1e10`.
:return: Reconditioned matrix.
"""
eigenthings = np.linalg.eig(matrix)
eigenvalues = eigenthings.eigenvalues
new_eigenvalues = np.copy(eigenvalues)
T = eigenvalues[0] / cond_max
new_eigenvalues[np.where(new_eigenvalues < T)] = T
return np.real(eigenthings.eigenvectors @ np.diag(new_eigenvalues) @ eigenthings.eigenvectors.T)