
View on GitHub


1 day
Test Coverage
MDSuite: A Zincwarecode package.

This program and the accompanying materials are made available under the terms
of the Eclipse Public License v2.0 which accompanies this distribution, and is
available at https://www.eclipse.org/legal/epl-v20.html

SPDX-License-Identifier: EPL-2.0

Copyright Contributors to the Zincwarecode Project.

Contact Information
email: zincwarecode@gmail.com
github: https://github.com/zincware
web: https://zincwarecode.com/

If you use this module please cite us with:

import dataclasses
import logging
import pathlib
import time
import typing
from typing import List

import h5py as hf
import numpy as np
import tensorflow as tf

from mdsuite.utils.meta_functions import join_path

log = logging.getLogger(__name__)

class PropertyInfo:
    Information of a trajectory property.

    pos_info = PropertyInfo('Positions', 3)
    vel_info = PropertyInfo('Velocities', 3).

        The name of the property
        The dimensionality of the property

    name: str
    n_dims: int

class SpeciesInfo:
    Information of a species.

        Name of the species (e.g. 'Na')
        Number of particles of that species
    properties: list of PropertyInfo
        List of the properties that were recorded for the species
        mass and charge are optional

    name: str
    n_particles: int
    properties: List[PropertyInfo]
    mass: float = None
    charge: float = 0

    def __eq__(self, other):
        same = (
            self.name == other.name
            and self.n_particles == other.n_particles
            and self.mass == other.mass
            and self.charge == other.charge
        if len(self.properties) != len(other.properties):
            return False

        for prop_s, prop_o in zip(self.properties, other.properties):
            same = same and prop_s == prop_o
        return same

class MoleculeInfo(SpeciesInfo):
    """Information about a Molecule.

    All the information of a species + groups

    groups: dict
        A molecule specific dictionary for mapping the molecule to the
        particles. The keys of this dict are index references to a specific molecule,
        i.e. molecule 1 and the values are a dict of atom species and their indices
        belonging to that specific molecule.
            water = {"groups": {"0": {"H": [0, 1], "O": [0]}}
        This tells us that the 0th water molecule consists of the 0th and 1st hydrogen
        atoms in the database as well as the 0th oxygen atom.

    groups: dict = None

    def __eq__(self, other):
        """Add a check to see if the groups are identical."""
        if self.groups != other.groups:
            return False
        return super(MoleculeInfo, self).__eq__(other)

class TrajectoryMetadata:
    """Trajectory Metadata container.

    This metadata must be extracted from trajectory files to build the database into
    which the trajectory will be stored.

    n_configurations : int
        Number of configurations of the whole trajectory.
    species_list: list of SpeciesInfo
        The information about all species in the system.
    box_l: list of float
        The simulation box size in three dimensions
    sample_rate : int optional
        The number of timesteps between consecutive samples
        # todo remove in favour of sample_step
    sample_step : int optional
        The time between consecutive configurations.
        E.g. for a simulation with time step 0.1 where the trajectory is written
        every 5 steps: sample_step = 0.5. Does not have to be specified
        (e.g. configurations from Monte Carlo scheme), but is needed for all
        dynamic observables.
    temperature : float optional
        The set temperature of the system.
        Optional because only applicable for MD simulations with thermostat.
        Needed for certain observables.
    simulation_data : str|Path, optional
        All other simulation data that can be extracted from the trajectory metadata.
        E.g. software version, pressure in NPT simulations, time step, ...

    n_configurations: int
    species_list: List[SpeciesInfo]
    box_l: list = None
    sample_rate: int = 1
    sample_step: float = None
    temperature: float = None
    simulation_data: dict = dataclasses.field(default_factory=dict)

class TrajectoryChunkData:
    """Class to specify the data format for transfer from the file to the database."""

    def __init__(self, species_list: List[SpeciesInfo], chunk_size: int):

        species_list : List[SpeciesInfo]
            List of SpeciesInfo.
            It contains the information which species are there and which properties
            are recorded for each
        chunk_size : int
            The number of configurations to be stored in this chunk
        self.chunk_size = chunk_size
        self.species_list = species_list
        self._data = {}
        for sp_info in species_list:
            self._data[sp_info.name] = {}
            for prop_info in sp_info.properties:
                self._data[sp_info.name][prop_info.name] = np.zeros(
                    (chunk_size, sp_info.n_particles, prop_info.n_dims)

    def add_data(self, data: np.ndarray, config_idx, species_name, property_name):
        Add configuration data to the chunk
            The data to be added, with shape (n_configs, n_particles, n_dims).
            n_particles and n_dims relates to the species and the property that is
            being added
            Start index of the configs that are being added.
            Name of the species to which the data belongs
            Name of the property being added.

        Storing velocity Information for 42 Na atoms in the 17th iteration of a loop
        that reads 5 configs per loop:
        add_data(vel_array, 16*5, 'Na', 'Velocities')
        where vel.data.shape == (5,42,3)

        n_configs = len(data)
            config_idx : config_idx + n_configs, :, :
        ] = data

    def get_data(self):
        return self._data

class Database:
    Database class.

    Databases make up a large part of the functionality of MDSuite and are kept
    fairly consistent in structure. Therefore, the database_path structure we
    are using has a separate class with commonly used methods which act as
    wrappers for the hdf5 database_path.

    path : str|Path
            The name of the database_path in question.

    def __init__(self, path: typing.Union[str, pathlib.Path] = "database"):
        Constructor for the database_path class.

        path : str|Path
                The name of the database_path in question.
        if isinstance(path, pathlib.Path):
            self.path = path.as_posix()
        elif isinstance(path, str):
            self.path = path  # name of the database_path
            # TODO fix this!
            log.debug(f"Expected str|Path but found {type(path)}")
            self.path = path

    def _update_indices(
        data: np.array, reference: np.array, batch_size: int, n_atoms: int
        Update the indices key of the structure dictionary if the tensor_values must be

        data : np.ndarray
        reference : np.ndarray
        batch_size : int
        n_atoms : int


        ids = np.reshape(np.array(data[:, 0]).astype(int), (-1, n_atoms))
        ref_ids = np.argsort(ids, axis=1)
        n_batches = ids.shape[0]

        return (
            ref_ids[:, reference - 1] + (np.arange(n_batches) * n_atoms)[None].T

    def _build_path_input(structure: dict) -> dict:
        Build an input to a hdf5 database_path from a dictionary.

        In many cases, whilst a dict can be passed on to a method, it is not ideal for
        use in the hdf5 database_path. This method takes a dictionary and return a new
        dictionary with the relevant file path.

        structure : dict
                General structure of the dictionary with relevant dataset sizes. e.g.
                {'Na': {'Forces': (200, 5000, 3)},
                'Pressure': (5000, 6), 'Temperature': (5000, 1)} In this case, the last
                 value in the tuple corresponds to the number of components that wil be
                 parsed to the database_path. The value in the dict can also be an
                 integer corresponding to a resize operation such as
                 {'Na': {'velocities' 100}}. In any case, the deepest portion of the
                 dict must be a non-dict object and will be returned as the value of the
                 path to it in the new dictionary.

        architecture : dict
                Corrected path in the hdf5 database_path. e.g. {'/Na/Velocities': 100},
                or {'/Na/Forces': (200, 5000, 3)}

        # Build file paths for the addition.
        architecture = {}
        for group in structure:
            if type(structure[group]) is not dict:
                architecture[group] = structure[group]
                for subgroup in structure[group]:
                    db_path = join_path(group, subgroup)
                    architecture[db_path] = structure[group][subgroup]

        return architecture

    def add_data(self, chunk: TrajectoryChunkData):
        Add new data to the dataset.

            a data chunk
            Configuration at which to start writing.
        workaround_time_in_axis_1 = True

        chunk_data = chunk.get_data()

        with hf.File(self.path, "r+") as database:
            for sp_info in chunk.species_list:
                for prop_info in sp_info.properties:
                    dataset_name = f"{sp_info.name}/{prop_info.name}"
                    write_data = chunk_data[sp_info.name][prop_info.name]

                    dataset_shape = database[dataset_name].shape
                    start_index = database[dataset_name].attrs["starting_index"]
                    stop_index = start_index + chunk.chunk_size

                    if len(dataset_shape) == 2:
                        # only one particle
                        database[dataset_name][start_index:stop_index, :] = write_data[
                            :, 0, :

                    elif len(dataset_shape) == 3:
                        if workaround_time_in_axis_1:
                            database[dataset_name][:, start_index:stop_index, :] = (
                                np.swapaxes(write_data, 0, 1)
                                start_index:stop_index, ...
                            ] = write_data
                        raise ValueError(
                            "dataset shape must be either (n_part,n_config,n_dim) or"
                            " (n_config, n_dim)"
                    database[dataset_name].attrs["starting_index"] += chunk.chunk_size

    def resize_datasets(self, structure: dict):
        Resize a dataset so more tensor_values can be added.

        structure : dict
                path to the dataset that needs to be resized. e.g.
                {'Na': {'velocities': (32, 100, 3)}}
                will resize all 'x', 'y', and 'z' datasets by 100 entries.


        with hf.File(self.path, "r+") as db:
            # construct the architecture dict
            architecture = self._build_path_input(structure=structure)

            # Check for a type error in the dataset information
            for identifier in architecture:
                dataset_information = architecture[identifier]
                if not isinstance(dataset_information, tuple):
                    raise TypeError("Invalid input for dataset generation")

                # get the correct maximum shape for the dataset -- changes if an
                # experiment property or an atomic property
                    if len(dataset_information[:-1]) == 1:
                        axis = 0
                        axis = 1

                    expansion = dataset_information[axis] + db[identifier].shape[axis]
                    db[identifier].resize(expansion, axis)

                # It is actually a new group
                except KeyError:
                    self.add_dataset({identifier: architecture[identifier]})

    def initialize_database(self, structure: dict):
        Build a database_path with a general structure.

        Note, this method WILL overwrite a pre-existing database_path. This is because
        it is only to be called on the initial construction of an experiment class and
        the first addition of tensor_values to it.

        structure : dict
                General structure of the dictionary with relevant dataset sizes.
                e.g. {'Na': {'Forces': (200, 5000, 3)}, 'Pressure': (5000, 6),
                'Temperature': (5000, 1)} In this case, the last value in the tuple
                corresponds to the number of components that wil be parsed to the


        architecture = self._build_path_input(structure)
        self.add_dataset(architecture)  # add a dataset to the groups

    def database_exists(self) -> bool:
        """Check if the database file already exists."""
        return pathlib.Path(self.path).exists()

    def add_dataset(self, architecture: dict):
        Add a dataset of the necessary size to the database_path.

        Just as a separate method exists for building the group structure of the hdf5
        database_path, so too do we include a separate method for adding a dataset.
        This is so datasets can be added not just upon the initial construction of the
        database_path, but also if tensor_values is added in the future that should
        also be stored. This method will assume that a group has already been built,
        although this is not necessary for HDF5, the separation of the actions is good

        architecture : dict
                Structure of properties to be added to the database_path.
                e.g. {'Na': {'Forces': (200, 5000, 3)}}

        Updates the database_path directly.
        with hf.File(self.path, "a") as database:
            for item in architecture:
                dataset_information = architecture[item]  # get the tuple information
                dataset_path = item  # get the dataset path in the database_path

                # Check for a type error in the dataset information
                    if type(dataset_information) is not tuple:
                        raise TypeError("Invalid input for dataset generation")
                except TypeError:
                    raise TypeError

                if len(dataset_information[:-1]) == 1:
                    vector_length = dataset_information[-1]
                    max_shape = (None, vector_length)
                    max_shape = list(dataset_information)
                    max_shape[1] = None
                    max_shape = tuple(max_shape)

                dataset = database[dataset_path]
                dataset.attrs["starting_index"] = 0

    def _add_group_structure(self, structure: dict):
        Add a simple group structure to a database_path.
        This method will take an input structure and build the required group structure
        in the hdf5 database_path. This will NOT however instantiate the dataset for the
        structure, only the group hierarchy.

        structure : dict
                Structure of a single property to be added to the database_path.
                e.g. {'Na': {'Forces': (200, 5000, 3)}}

        Updates the database_path directly.
        with hf.File(self.path, "a") as database:
            # Build file paths for the addition.
            architecture = self._build_path_input(structure=structure)
            for item in list(architecture):
                if item in database:
                    log.info("Group structure already exists")

    def get_memory_information(self) -> dict:
        Get memory information from the database_path.

        memory_database : dict
                A dictionary of the memory information of the groups in the
        with hf.File(self.path, "r") as database:
            memory_database = {}
            for item in database:
                for ds in database[item]:
                    memory_database[join_path(item, ds)] = database[item][ds].nbytes

        return memory_database

    def check_existence(self, path: str) -> bool:
        Check to see if a dataset is in the database_path.

        path : str
                Path to the desired dataset

        response : bool
                If true, the path exists, else, it does not.
        with hf.File(self.path, "r") as database_object:
            keys = []
                lambda item: (
                    if isinstance(database_object[item], hf.Dataset)
                    else None
            path = f"/{path}"  # add the / to avoid name overlapping

            response = any(list(item.endswith(path) for item in keys))
        return response

    def change_key_names(self, mapping: dict):
        Change the name of database_path keys.

        mapping : dict
                Mapping for the change of names

        Updates the database_path
        with hf.File(self.path, "r+") as db:
            groups = list(db.keys())

            for item in groups:
                if item in mapping:
                    db.move(item, mapping[item])

    def load_data(
        path_list: list = None,
        select_slice: np.s_ = np.s_[:],
        dictionary: bool = False,
        scaling: list = None,
        d_size: int = None,
        Load tensor_values from the database_path for some operation.

        Should be called by the tensor_values fetch class as this will ensure
        correct loading and pre-loading.


        if scaling is None:
            scaling = [1 for _ in range(len(path_list))]

        with hf.File(self.path, "r") as database:
            data = {}
            for i, item in enumerate(path_list):
                if type(select_slice) is dict:
                    # index is the particle species name in the full item as a str.
                    slice_index = item.decode().split("/")[0]
                    my_slice = select_slice[slice_index]
                    my_slice = select_slice
                    data[item] = (
                        tf.convert_to_tensor(database[item][my_slice], dtype=tf.float64)
                        * scaling[i]
                except TypeError:
                    data[item] = (
                            database[item][my_slice[0]][:, my_slice[1], :],
                        * scaling[i]
            data[str.encode("data_size")] = d_size

        return data

    def get_load_time(self, database_path: str = None):
        Calculate the open/close time of the database_path.

        database_path : str
                Database path on which to test the time.

        opening time : float
                Time taken to open and close the database_path
        if database_path is None:
            start = time.time()
            database_path = hf.File(self.path, "r")
            stop = time.time()
            start = time.time()
            database_path = hf.File(database_path, "r")
            stop = time.time()

        return stop - start

    def get_data_size(self, data_path: str) -> tuple:
        Return the size of a dataset as a tuple (n_rows, n_columns, n_bytes).

        data_path : str
                path to the tensor_values in the hdf5 database_path.

        dataset_properties : tuple
                Tuple of tensor_values about the dataset, e.g.
                (n_rows, n_columns, n_bytes)
        with hf.File(self.path, "r") as db:
            data_tuple = (

        return data_tuple

    def get_database_summary(self):
        Get a summary of the database properties.

        summary : list
                A list of properties that are in the database.
        with hf.File(self.path, "r") as db:
            return list(db.keys())