pytim/observables/rdf.py
# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding: utf-8 -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
""" Module: RDF
===========
"""
from __future__ import print_function
import numpy as np
from .distributionfunction import DistributionFunction
from MDAnalysis.lib import distances
from . import Distance
class RDF(DistributionFunction):
r"""Calculates a radial distribution function of some observable from two
groups.
The two functions must return an array (of scalars or of vectors)
having the same size of the group. The scalar product between the
two functions is used to weight the distriution function.
.. math::
g(r) = \frac{1}{N}\left\langle \sum_{i\neq j} \delta(r-|r_i-r_j|)\
f_1(r_i,v_i)\cdot f_2(r_j,v_j) \right\rangle
:param double max_radius: compute the rdf up to this distance.
If 'full' is supplied (default) computes
it up to half of the smallest box side.
:param int nbins: number of bins
:param Observable observable: observable for the first group
:param Observable observable2: observable for the second group
Example:
>>> import MDAnalysis as mda
>>> import numpy as np
>>> import pytim
>>> from pytim import observables
>>> from pytim.datafiles import *
>>>
>>> u = mda.Universe(WATER_GRO,WATER_XTC)
>>> oxygens = u.select_atoms("name OW")
>>>
>>> nres = observables.NumberOfResidues()
>>>
>>> rdf = observables.RDF(u,nbins=120,observable=nres,observable2=nres)
>>>
>>> interface = pytim.ITIM(u,alpha=2.,group=oxygens,cluster_cut=3.5,molecular=False)
>>>
>>> for ts in u.trajectory[::50]:
... layer=interface.layers[0,0]
... rdf.sample(layer,layer)
>>> rdf.count[0]=0
>>> np.savetxt('RDF3D.dat', np.column_stack((rdf.bins,rdf.rdf)))
Note that one needs to specify neither both groups, not both observables.
If only the first group (observable) is specified, the second is assumed
to be the same as the first, as in the following example:
>>> rdf1 = observables.RDF(u,observable=nres)
>>> rdf2 = observables.RDF(u,observable=nres)
>>> rdf3 = observables.RDF(u,observable=nres,observable2=nres)
>>>
>>> rdf1.sample(layer)
>>> rdf2.sample(layer,layer)
>>> rdf3.sample(layer,layer)
>>> print (np.all(rdf1.rdf[:]==rdf2.rdf[:]))
True
>>> print (np.all(rdf1.rdf[:]==rdf3.rdf[:]))
True
"""
def __init__(self,
universe,
max_distance='full',
nbins=75,
start=None,
stop=None,
step=None,
observable=None,
observable2=None,
**kargs):
try:
max_distance = kargs['max_radius']
except:
pass
if max_distance == 'full':
max_distance = np.min(universe.dimensions[:3]) / 2.
self._distance = Distance()
DistributionFunction.__init__(
self,
universe,
order=2,
nbins=nbins,
start=start,
stop=stop,
step=step,
generalized_coordinate=self._distance,
generalized_coordinate2=self._distance,
observable=observable,
observable2=observable2,
max_distance=max_distance,
coords_in=['x', 'y', 'z'],
coords_out=['r'],
kargs1=None,
kargs2=None)
def sample(self, g1=None, g2=None, kargs1=None, kargs2=None):
if g2 is None:
g2 = g1
DistributionFunction.sample(
self,
g1=g1,
g2=g2,
kargs1=kargs1,
kargs2=kargs2)
@property
def bins(self):
return 0.5 * (self.edges[0][:-1] + self.edges[0][1:])
@property
def rdf(self):
# Volume in each radial shell
dr = (self.edges[0][1:] - self.edges[0][:-1])
avr = (self.edges[0][1:] + self.edges[0][:-1]) / 2.
vol = 4. * np.pi * avr**2 * dr
# normalization
density = self.n_normalize / self.volume
self._rdf = self.count / (density * vol * self.n_frames)
return self._rdf
class oldRDF(object):
r"""Calculates a radial distribution function of some observable from two
groups.
The two functions must return an array (of scalars or of vectors)
having the same size of the group. The scalar product between the
two functions is used to weight the distriution function.
.. math::
g(r) = \frac{1}{N}\left\langle \sum_{i\neq j} \delta(r-|r_i-r_j|)\
f_1(r_i,v_i)\cdot f_2(r_j,v_j) \right\rangle
:param double max_radius: compute the rdf up to this distance.
If 'full' is supplied (default) computes
it up to half of the smallest box side.
:param int nbins: number of bins
:param Observable observable: observable for the first group
:param Observable observable2: observable for the second group
Example:
>>> import MDAnalysis as mda
>>> import numpy as np
>>> import pytim
>>> from pytim import observables
>>> from pytim.datafiles import *
>>>
>>> u = mda.Universe(WATER_GRO,WATER_XTC)
>>> oxygens = u.select_atoms("name OW")
>>>
>>> nres = observables.NumberOfResidues()
>>>
>>> rdf = observables.RDF(u,nbins=120,observable=nres,observable2=nres)
>>>
>>> interface = pytim.ITIM(u,alpha=2.,group=oxygens,cluster_cut=3.5,molecular=False)
>>>
>>> for ts in u.trajectory[::50]:
... layer=interface.layers[0,0]
... rdf.sample(layer,layer)
>>> rdf.count[0]=0
>>> np.savetxt('RDF3D.dat', np.column_stack((rdf.bins,rdf.rdf)))
Note that one needs to specify neither both groups, not both observables.
If only the first group (observable) is specified, the second is assumed
to be the same as the first, as in the following example:
>>> rdf1 = observables.RDF(u,observable=nres)
>>> rdf2 = observables.RDF(u,observable=nres)
>>> rdf3 = observables.RDF(u,observable=nres,observable2=nres)
>>>
>>> rdf1.sample(layer)
>>> rdf2.sample(layer,layer)
>>> rdf3.sample(layer,layer)
>>> print (np.all(rdf1.rdf[:]==rdf2.rdf[:]))
True
>>> print (np.all(rdf1.rdf[:]==rdf3.rdf[:]))
True
"""
def __init__(self,
universe,
nbins=75,
max_radius='full',
start=None,
stop=None,
step=None,
observable=None,
observable2=None,
kargs1=None,
kargs2=None):
kargs1 = kargs1 or {}
kargs2 = kargs2 or {}
if max_radius == 'full':
self.max_radius = np.min(universe.dimensions[:3]) / 2.
else:
self.max_radius = max_radius
self.rdf_settings = {'bins': nbins, 'range': (0., self.max_radius)}
self.universe = universe
self.nsamples = 0
self.observable = observable
self.kargs1 = kargs1
self.kargs2 = kargs2
if observable2 is None:
self.observable2 = observable
else:
self.observable2 = observable2
self.n_frames = 0
self.volume = 0.0
self.n_squared = 0
count, edges = np.histogram([-1, -1], **self.rdf_settings)
self.count = count * 0.0
self.edges = edges
self.bins = 0.5 * (edges[:-1] + edges[1:])
self.g1 = self.universe.atoms
self.g2 = None
self._rdf = self.count
def _compute_observable(self, ka1, ka2):
try:
fg1 = self.observable.compute(self.g1, ka1)
except:
fg1 = self.observable.compute(self.g1)
if (self.g1 == self.g2 and self.observable == self.observable2):
fg2 = fg1
else:
try:
fg2 = self.observable2.compute(self.g2, ka2)
except:
fg2 = self.observable2.compute(self.g2)
try:
error = (fg1.shape[0] != self.g1.n_atoms
or fg2.shape[0] != self.g2.n_atoms)
except:
error = True
return fg1, fg2, error
def _determine_weights(self, fg1, fg2):
# both are (arrays of) scalars
if len(fg1.shape) == 1 and len(fg2.shape) == 1:
weights = np.outer(fg1, fg2)
# both are (arrays of) vectors
elif len(fg1.shape) == 2 and len(fg2.shape) == 2:
# TODO: tests on the second dimension...
weights = np.dot(fg1, fg2.T)
else:
raise Exception("Error, shape of the observable output not handled"
"in RDF")
return weights
def sample(self, g1=None, g2=None, kargs1=None, kargs2=None):
kargs1 = kargs1 or {}
kargs2 = kargs2 or {}
self.n_frames += 1
self.g2 = g2
if g1 is not None:
self.g1 = g1
if g2 is None:
self.g2 = self.g1 # all atoms by default (see __init__)
ka1 = self.kargs1.copy()
ka1.update(kargs1)
ka2 = self.kargs2.copy()
ka2.update(kargs2)
if self.observable is not None:
# determine weights, otherwise assumes number of atoms (default)
fg1, fg2, error = self._compute_observable(ka1, ka2)
if error is True:
raise Exception(
"Error, the observable passed to RDF should output "
"an array (of scalar or vectors) the same size of "
"the group")
# numpy.histogram accepts negative weights
self.rdf_settings['weights'] = self._determine_weights(fg1, fg2)
# This still uses MDA's distance_array. Pro: works also in triclinic
# boxes. Con: could be faster (?)
_distances = np.zeros((len(self.g1), len(self.g2)), dtype=np.float64)
distances.distance_array(
self.g1.positions,
self.g2.positions,
box=self.universe.dimensions,
result=_distances)
count = np.histogram(_distances, **self.rdf_settings)[0]
self.count += count
box = self.universe.dimensions
self.volume += np.product(box[:3])
self.nsamples += 1
self.n_squared += len(self.g1) * len(self.g2)
@property
def rdf(self):
# Volume in each radial shell
dr = (self.edges[1:] - self.edges[:-1])
avr = (self.edges[1:] + self.edges[:-1]) / 2.
vol = 4. * np.pi * avr**2 * dr
# normalization
density = self.n_squared / self.volume
self._rdf = self.count / (density * vol * self.n_frames)
return self._rdf