zincware/MDSuite

View on GitHub
mdsuite/database/simulation_database.py

Summary

Maintainability
D
1 day
Test Coverage
"""
MDSuite: A Zincwarecode package.

License
-------
This program and the accompanying materials are made available under the terms
of the Eclipse Public License v2.0 which accompanies this distribution, and is
available at https://www.eclipse.org/legal/epl-v20.html

SPDX-License-Identifier: EPL-2.0

Copyright Contributors to the Zincwarecode Project.

Contact Information
-------------------
email: zincwarecode@gmail.com
github: https://github.com/zincware
web: https://zincwarecode.com/

Citation
--------
If you use this module please cite us with:

Summary
-------
"""
import dataclasses
import logging
import pathlib
import time
import typing
from typing import List

import h5py as hf
import numpy as np
import tensorflow as tf

from mdsuite.utils.meta_functions import join_path

log = logging.getLogger(__name__)


@dataclasses.dataclass(frozen=True)
class PropertyInfo:
    """
    Information of a trajectory property.

    Example:
    -------
    pos_info = PropertyInfo('Positions', 3)
    vel_info = PropertyInfo('Velocities', 3).

    Attributes:
    ----------
    name:
        The name of the property
    n_dims:
        The dimensionality of the property
    """

    name: str
    n_dims: int


@dataclasses.dataclass
class SpeciesInfo:
    """
    Information of a species.

    Attributes
    ----------
    name
        Name of the species (e.g. 'Na')
    n_particles
        Number of particles of that species
    properties: list of PropertyInfo
        List of the properties that were recorded for the species
        mass and charge are optional
    """

    name: str
    n_particles: int
    properties: List[PropertyInfo]
    mass: float = None
    charge: float = 0

    def __eq__(self, other):
        same = (
            self.name == other.name
            and self.n_particles == other.n_particles
            and self.mass == other.mass
            and self.charge == other.charge
        )
        if len(self.properties) != len(other.properties):
            return False

        for prop_s, prop_o in zip(self.properties, other.properties):
            same = same and prop_s == prop_o
        return same


@dataclasses.dataclass
class MoleculeInfo(SpeciesInfo):
    """Information about a Molecule.

    All the information of a species + groups

    Attributes
    ----------
    groups: dict
        A molecule specific dictionary for mapping the molecule to the
        particles. The keys of this dict are index references to a specific molecule,
        i.e. molecule 1 and the values are a dict of atom species and their indices
        belonging to that specific molecule.
        e.g
            water = {"groups": {"0": {"H": [0, 1], "O": [0]}}
        This tells us that the 0th water molecule consists of the 0th and 1st hydrogen
        atoms in the database as well as the 0th oxygen atom.
    """

    groups: dict = None

    def __eq__(self, other):
        """Add a check to see if the groups are identical."""
        if self.groups != other.groups:
            return False
        return super(MoleculeInfo, self).__eq__(other)


@dataclasses.dataclass
class TrajectoryMetadata:
    """Trajectory Metadata container.

    This metadata must be extracted from trajectory files to build the database into
    which the trajectory will be stored.

    Attributes
    ----------
    n_configurations : int
        Number of configurations of the whole trajectory.
    species_list: list of SpeciesInfo
        The information about all species in the system.
    box_l: list of float
        The simulation box size in three dimensions
    sample_rate : int optional
        The number of timesteps between consecutive samples
        # todo remove in favour of sample_step
    sample_step : int optional
        The time between consecutive configurations.
        E.g. for a simulation with time step 0.1 where the trajectory is written
        every 5 steps: sample_step = 0.5. Does not have to be specified
        (e.g. configurations from Monte Carlo scheme), but is needed for all
        dynamic observables.
    temperature : float optional
        The set temperature of the system.
        Optional because only applicable for MD simulations with thermostat.
        Needed for certain observables.
    simulation_data : str|Path, optional
        All other simulation data that can be extracted from the trajectory metadata.
        E.g. software version, pressure in NPT simulations, time step, ...
    """

    n_configurations: int
    species_list: List[SpeciesInfo]
    box_l: list = None
    sample_rate: int = 1
    sample_step: float = None
    temperature: float = None
    simulation_data: dict = dataclasses.field(default_factory=dict)


class TrajectoryChunkData:
    """Class to specify the data format for transfer from the file to the database."""

    def __init__(self, species_list: List[SpeciesInfo], chunk_size: int):
        """

        Parameters
        ----------
        species_list : List[SpeciesInfo]
            List of SpeciesInfo.
            It contains the information which species are there and which properties
            are recorded for each
        chunk_size : int
            The number of configurations to be stored in this chunk
        """
        self.chunk_size = chunk_size
        self.species_list = species_list
        self._data = {}
        for sp_info in species_list:
            self._data[sp_info.name] = {}
            for prop_info in sp_info.properties:
                self._data[sp_info.name][prop_info.name] = np.zeros(
                    (chunk_size, sp_info.n_particles, prop_info.n_dims)
                )

    def add_data(self, data: np.ndarray, config_idx, species_name, property_name):
        """
        Add configuration data to the chunk
        Parameters
        ----------
        data:
            The data to be added, with shape (n_configs, n_particles, n_dims).
            n_particles and n_dims relates to the species and the property that is
            being added
        config_idx:
            Start index of the configs that are being added.
        species_name
            Name of the species to which the data belongs
        property_name
            Name of the property being added.

        Example:
        -------
        Storing velocity Information for 42 Na atoms in the 17th iteration of a loop
        that reads 5 configs per loop:
        add_data(vel_array, 16*5, 'Na', 'Velocities')
        where vel.data.shape == (5,42,3)

        """
        n_configs = len(data)
        self._data[species_name][property_name][
            config_idx : config_idx + n_configs, :, :
        ] = data

    def get_data(self):
        return self._data


class Database:
    """
    Database class.

    Databases make up a large part of the functionality of MDSuite and are kept
    fairly consistent in structure. Therefore, the database_path structure we
    are using has a separate class with commonly used methods which act as
    wrappers for the hdf5 database_path.

    Attributes
    ----------
    path : str|Path
            The name of the database_path in question.
    """

    def __init__(self, path: typing.Union[str, pathlib.Path] = "database"):
        """
        Constructor for the database_path class.

        Parameters
        ----------
        path : str|Path
                The name of the database_path in question.
        """
        if isinstance(path, pathlib.Path):
            self.path = path.as_posix()
        elif isinstance(path, str):
            self.path = path  # name of the database_path
        else:
            # TODO fix this!
            log.debug(f"Expected str|Path but found {type(path)}")
            self.path = path

    @staticmethod
    def _update_indices(
        data: np.array, reference: np.array, batch_size: int, n_atoms: int
    ):
        """
        Update the indices key of the structure dictionary if the tensor_values must be
        sorted.

        Parameters
        ----------
        data : np.ndarray
        reference : np.ndarray
        batch_size : int
        n_atoms : int

        Returns
        -------

        """
        ids = np.reshape(np.array(data[:, 0]).astype(int), (-1, n_atoms))
        ref_ids = np.argsort(ids, axis=1)
        n_batches = ids.shape[0]

        return (
            ref_ids[:, reference - 1] + (np.arange(n_batches) * n_atoms)[None].T
        ).flatten()

    @staticmethod
    def _build_path_input(structure: dict) -> dict:
        """
        Build an input to a hdf5 database_path from a dictionary.

        In many cases, whilst a dict can be passed on to a method, it is not ideal for
        use in the hdf5 database_path. This method takes a dictionary and return a new
        dictionary with the relevant file path.


        Parameters
        ----------
        structure : dict
                General structure of the dictionary with relevant dataset sizes. e.g.
                {'Na': {'Forces': (200, 5000, 3)},
                'Pressure': (5000, 6), 'Temperature': (5000, 1)} In this case, the last
                 value in the tuple corresponds to the number of components that wil be
                 parsed to the database_path. The value in the dict can also be an
                 integer corresponding to a resize operation such as
                 {'Na': {'velocities' 100}}. In any case, the deepest portion of the
                 dict must be a non-dict object and will be returned as the value of the
                 path to it in the new dictionary.


        Returns
        -------
        architecture : dict
                Corrected path in the hdf5 database_path. e.g. {'/Na/Velocities': 100},
                or {'/Na/Forces': (200, 5000, 3)}

        """
        # Build file paths for the addition.
        architecture = {}
        for group in structure:
            if type(structure[group]) is not dict:
                architecture[group] = structure[group]
            else:
                for subgroup in structure[group]:
                    db_path = join_path(group, subgroup)
                    architecture[db_path] = structure[group][subgroup]

        return architecture

    def add_data(self, chunk: TrajectoryChunkData):
        """
        Add new data to the dataset.

        Parameters
        ----------
        chunk:
            a data chunk
        start_idx:
            Configuration at which to start writing.
        """
        workaround_time_in_axis_1 = True

        chunk_data = chunk.get_data()

        with hf.File(self.path, "r+") as database:
            for sp_info in chunk.species_list:
                for prop_info in sp_info.properties:
                    dataset_name = f"{sp_info.name}/{prop_info.name}"
                    write_data = chunk_data[sp_info.name][prop_info.name]

                    dataset_shape = database[dataset_name].shape
                    start_index = database[dataset_name].attrs["starting_index"]
                    stop_index = start_index + chunk.chunk_size

                    if len(dataset_shape) == 2:
                        # only one particle
                        database[dataset_name][start_index:stop_index, :] = write_data[
                            :, 0, :
                        ]

                    elif len(dataset_shape) == 3:
                        if workaround_time_in_axis_1:
                            database[dataset_name][:, start_index:stop_index, :] = (
                                np.swapaxes(write_data, 0, 1)
                            )
                        else:
                            database[dataset_name][
                                start_index:stop_index, ...
                            ] = write_data
                    else:
                        raise ValueError(
                            "dataset shape must be either (n_part,n_config,n_dim) or"
                            " (n_config, n_dim)"
                        )
                    database[dataset_name].attrs["starting_index"] += chunk.chunk_size

    def resize_datasets(self, structure: dict):
        """
        Resize a dataset so more tensor_values can be added.

        Parameters
        ----------
        structure : dict
                path to the dataset that needs to be resized. e.g.
                {'Na': {'velocities': (32, 100, 3)}}
                will resize all 'x', 'y', and 'z' datasets by 100 entries.

        Returns
        -------

        """
        with hf.File(self.path, "r+") as db:
            # construct the architecture dict
            architecture = self._build_path_input(structure=structure)

            # Check for a type error in the dataset information
            for identifier in architecture:
                dataset_information = architecture[identifier]
                if not isinstance(dataset_information, tuple):
                    raise TypeError("Invalid input for dataset generation")

                # get the correct maximum shape for the dataset -- changes if an
                # experiment property or an atomic property
                try:
                    if len(dataset_information[:-1]) == 1:
                        axis = 0
                    else:
                        axis = 1

                    expansion = dataset_information[axis] + db[identifier].shape[axis]
                    db[identifier].resize(expansion, axis)

                # It is actually a new group
                except KeyError:
                    self.add_dataset({identifier: architecture[identifier]})

    def initialize_database(self, structure: dict):
        """
        Build a database_path with a general structure.

        Note, this method WILL overwrite a pre-existing database_path. This is because
        it is only to be called on the initial construction of an experiment class and
        the first addition of tensor_values to it.


        Parameters
        ----------
        structure : dict
                General structure of the dictionary with relevant dataset sizes.
                e.g. {'Na': {'Forces': (200, 5000, 3)}, 'Pressure': (5000, 6),
                'Temperature': (5000, 1)} In this case, the last value in the tuple
                corresponds to the number of components that wil be parsed to the
                database_path.

        Returns
        -------

        """
        architecture = self._build_path_input(structure)
        self.add_dataset(architecture)  # add a dataset to the groups

    def database_exists(self) -> bool:
        """Check if the database file already exists."""
        return pathlib.Path(self.path).exists()

    def add_dataset(self, architecture: dict):
        """
        Add a dataset of the necessary size to the database_path.

        Just as a separate method exists for building the group structure of the hdf5
        database_path, so too do we include a separate method for adding a dataset.
        This is so datasets can be added not just upon the initial construction of the
        database_path, but also if tensor_values is added in the future that should
        also be stored. This method will assume that a group has already been built,
        although this is not necessary for HDF5, the separation of the actions is good
        practice.

        Parameters
        ----------
        architecture : dict
                Structure of properties to be added to the database_path.
                e.g. {'Na': {'Forces': (200, 5000, 3)}}

        Returns
        -------
        Updates the database_path directly.
        """
        with hf.File(self.path, "a") as database:
            for item in architecture:
                dataset_information = architecture[item]  # get the tuple information
                dataset_path = item  # get the dataset path in the database_path

                # Check for a type error in the dataset information
                try:
                    if type(dataset_information) is not tuple:
                        raise TypeError("Invalid input for dataset generation")
                except TypeError:
                    raise TypeError

                if len(dataset_information[:-1]) == 1:
                    vector_length = dataset_information[-1]
                    max_shape = (None, vector_length)
                else:
                    max_shape = list(dataset_information)
                    max_shape[1] = None
                    max_shape = tuple(max_shape)

                database.create_dataset(
                    dataset_path,
                    dataset_information,
                    maxshape=max_shape,
                    compression="gzip",
                    chunks=True,
                )
                dataset = database[dataset_path]
                dataset.attrs["starting_index"] = 0

    def _add_group_structure(self, structure: dict):
        """
        Add a simple group structure to a database_path.
        This method will take an input structure and build the required group structure
        in the hdf5 database_path. This will NOT however instantiate the dataset for the
        structure, only the group hierarchy.


        Parameters
        ----------
        structure : dict
                Structure of a single property to be added to the database_path.
                e.g. {'Na': {'Forces': (200, 5000, 3)}}

        Returns
        -------
        Updates the database_path directly.
        """
        with hf.File(self.path, "a") as database:
            # Build file paths for the addition.
            architecture = self._build_path_input(structure=structure)
            for item in list(architecture):
                if item in database:
                    log.info("Group structure already exists")
                else:
                    database.create_group(item)

    def get_memory_information(self) -> dict:
        """
        Get memory information from the database_path.

        Returns
        -------
        memory_database : dict
                A dictionary of the memory information of the groups in the
                database_path
        """
        with hf.File(self.path, "r") as database:
            memory_database = {}
            for item in database:
                for ds in database[item]:
                    memory_database[join_path(item, ds)] = database[item][ds].nbytes

        return memory_database

    def check_existence(self, path: str) -> bool:
        """
        Check to see if a dataset is in the database_path.

        Parameters
        ----------
        path : str
                Path to the desired dataset

        Returns
        -------
        response : bool
                If true, the path exists, else, it does not.
        """
        with hf.File(self.path, "r") as database_object:
            keys = []
            database_object.visit(
                lambda item: (
                    keys.append(database_object[item].name)
                    if isinstance(database_object[item], hf.Dataset)
                    else None
                )
            )
            path = f"/{path}"  # add the / to avoid name overlapping

            response = any(list(item.endswith(path) for item in keys))
        return response

    def change_key_names(self, mapping: dict):
        """
        Change the name of database_path keys.

        Parameters
        ----------
        mapping : dict
                Mapping for the change of names

        Returns
        -------
        Updates the database_path
        """
        with hf.File(self.path, "r+") as db:
            groups = list(db.keys())

            for item in groups:
                if item in mapping:
                    db.move(item, mapping[item])

    def load_data(
        self,
        path_list: list = None,
        select_slice: np.s_ = np.s_[:],
        dictionary: bool = False,
        scaling: list = None,
        d_size: int = None,
    ):
        """
        Load tensor_values from the database_path for some operation.

        Should be called by the tensor_values fetch class as this will ensure
        correct loading and pre-loading.

        Returns
        -------

        """
        if scaling is None:
            scaling = [1 for _ in range(len(path_list))]

        with hf.File(self.path, "r") as database:
            data = {}
            for i, item in enumerate(path_list):
                if type(select_slice) is dict:
                    # index is the particle species name in the full item as a str.
                    slice_index = item.decode().split("/")[0]
                    my_slice = select_slice[slice_index]
                else:
                    my_slice = select_slice
                try:
                    data[item] = (
                        tf.convert_to_tensor(database[item][my_slice], dtype=tf.float64)
                        * scaling[i]
                    )
                except TypeError:
                    data[item] = (
                        tf.convert_to_tensor(
                            database[item][my_slice[0]][:, my_slice[1], :],
                            dtype=tf.float64,
                        )
                        * scaling[i]
                    )
            data[str.encode("data_size")] = d_size

        return data

    def get_load_time(self, database_path: str = None):
        """
        Calculate the open/close time of the database_path.

        Parameters
        ----------
        database_path : str
                Database path on which to test the time.

        Returns
        -------
        opening time : float
                Time taken to open and close the database_path
        """
        if database_path is None:
            start = time.time()
            database_path = hf.File(self.path, "r")
            database_path.close()
            stop = time.time()
        else:
            start = time.time()
            database_path = hf.File(database_path, "r")
            database_path.close()
            stop = time.time()

        return stop - start

    def get_data_size(self, data_path: str) -> tuple:
        """
        Return the size of a dataset as a tuple (n_rows, n_columns, n_bytes).

        Parameters
        ----------
        data_path : str
                path to the tensor_values in the hdf5 database_path.

        Returns
        -------
        dataset_properties : tuple
                Tuple of tensor_values about the dataset, e.g.
                (n_rows, n_columns, n_bytes)
        """
        with hf.File(self.path, "r") as db:
            data_tuple = (
                db[data_path].shape[0],
                db[data_path].shape[1],
                db[data_path].nbytes,
            )

        return data_tuple

    def get_database_summary(self):
        """
        Get a summary of the database properties.

        Returns
        -------
        summary : list
                A list of properties that are in the database.
        """
        with hf.File(self.path, "r") as db:
            return list(db.keys())