phydev/trajpy

View on GitHub
trajpy/traj_generator.py

Summary

Maintainability
A
1 hr
Test Coverage
import numpy as np
from typing import Union, Tuple

def weierstrass_mandelbrot(t: float, n_displacements: int, alpha: float) -> float:
    """
    Calculates the weierstrass mandelbrot function
    
    .. math::
        W(t) = \\sum_{n=-\\infty}^{\\infty} \\frac{\\cos{(\\phi_n )} - \\cos{(\\gamma^n t^* + \\phi_n )} }{\\gamma^{n\\alpha/2}} \\, .

    :param t: time step
    :param n_displacements: number of displacements
    :param alpha: anomalous exponent
    :return: anomalous step
    """
    gamma = np.sqrt(np.pi)
    t_star = (2. * np.pi * t) / n_displacements

    wsm = 0.

    for iteration in range(-8, 49):  # [-8, 48]
        phi = 2. * np.random.rand() * np.pi
        wsm += (np.cos(phi) - np.cos(np.power(gamma, iteration) * t_star + phi)) / \
               (np.power(gamma, iteration * (alpha / 2.)))
    return wsm


def anomalous_diffusion(n_steps: int, n_samples: int, time_step: float, alpha: float) -> Tuple[np.ndarray, np.ndarray]:
    """
    Generates an ensemble of anomalous trajectories.

    :param n_steps: total number of steps
    :param n_samples: number of simulations
    :param time_step: time step
    :param alpha: anomalous exponent
    :return x, y: time, array containing N_sample trajectories with Nsteps
    """
    x = np.zeros(n_steps) * time_step
    y = np.zeros((n_steps, n_samples))

    for i_sample in range(0, n_samples):

        for i_step in range(0, n_steps):
            t = i_step * time_step
            y[i_step, i_sample] = weierstrass_mandelbrot(t, n_steps, alpha=alpha)
            x[i_step] = t

    if n_samples == 1:
        y = y.transpose()[0]

    return x, y


def normal_distribution(u: float, D: float, dt: float) -> float:
    """
    This is the steplength probability density function for normal diffusion.

    :param u: absolute distance travelled by the particle durint the time interval dt
    :param D: diffusivity
    :param dt: time interval
    :return pdf: probability density function

    """
    diff = 4. * D * dt
    pdf = ((2. * u) / diff) * np.exp(-np.power(u, 2) / diff)
    return pdf


def normal_diffusion(n_steps: int, n_samples: int, dx: float, y0: float, D: float, dt: float) -> Tuple[np.ndarray, np.ndarray]:
    """
    Generates an ensemble of normal diffusion trajectories.

    :param n_steps: total steps
    :param n_samples: number of trajectories
    :param dx: maximum step length
    :param y0: starting position
    :param D: diffusivity
    :param dt: time step
    :return x, y: time, array containing N_samples trajectories with N_steps
    """
    
    y = np.zeros((n_steps, n_samples))
    x = np.linspace(0, n_steps, n_steps) 
    y[0, :] = y0
    
    for i_sample in range(0, n_samples):
        i_step = 1
        while True:
            
            if i_step >= n_steps:
                break
            
            random_number = np.random.rand()
            u = (0.5 - random_number) * dx  # step length and direction
            if random_number >= normal_distribution(np.abs(u), D, dt):
                y[i_step, i_sample] = y[i_step-1, i_sample] + u

            i_step += 1       
    return x, y


def confined_diffusion(radius: float, n_steps: int, n_samples: int, dx: float, y0: float, D: float, dt: float) -> Tuple[np.ndarray, np.ndarray]:
    """
    Generates trajectories under confinement.

    :param radius: confinement radius
    :param n_steps: number of displacements
    :param n_samples: number of trajectories
    :param dx: displacement 
    :param y0: initial position
    :param D: diffusion coefficient
    :param dt: time step
    :return x, y: time, array containing N_samples trajectories with N_steps
    """
    y = np.zeros((n_steps, n_samples))
    x = np.linspace(0, n_steps, n_steps) 
    y[0, :] = y0
    sub_step = 0.0
    for i_sample in range(0, n_samples):  
       
        for i_step in range(0, n_steps):
            
            sub_x, sub_y = normal_diffusion(n_steps=100, n_samples=1, dx=dx, y0=sub_step, D=D, dt=dt)
            
            if sub_y[-1] < radius:
                t = i_step * dt
                y[i_step, i_sample] = sub_y[-1]
                sub_step = sub_y[-1]
                x[i_step] = t

    return x, y


def superdiffusion(velocity: float, n_steps: int, n_samples: int, y0: float, dt: float) -> Tuple[np.ndarray, np.ndarray]:
    """
    Generates direct diffusion trajectories. 
    Combine pairwise with normal diffusion components.
    
    :param velocity: constant velocity 
    :param n_steps: number of time steps
    :param n_samples: number of trajectories
    :param y0: initial position
    :param dt: time interval
    :return x, y: time, array containing N_samples trajectories with N_steps
    """
    y = np.zeros((n_steps, n_samples))
    x = np.linspace(0, n_steps, n_steps) 
    y[0, :] = y0
    
    for i_sample in range(0,n_samples):  
       
        for i_step in range(1, n_steps):
            y[i_step, i_sample] = y[i_step-1, i_sample] + velocity * dt
            x[i_step] = i_step * dt
            
    return x, y


def save_to_file(y: np.ndarray, param: Union[int, float, str], path: str) -> None:
    """
    Saves the trajectories to a file.

    :param y: trajectory array
    :param param: a parameter that characterizes the kind of trajectory
    :param path: path to the folder where the file will be saved
    """
    dims = ['x','y']
    np.savetxt(path + '/traj_' + str(param) + '.csv',y,delimiter=',',
      header='t,'+(",".join(dims[:y.shape[1]]*int((y.shape[1])/2))),comments='')